summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/ExtensionStructure.h1
-rw-r--r--include/framework/InternalLevel.h8
-rw-r--r--include/framework/Scheduler.h4
3 files changed, 9 insertions, 4 deletions
diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h
index 2fb9cf0..892e63b 100644
--- a/include/framework/ExtensionStructure.h
+++ b/include/framework/ExtensionStructure.h
@@ -312,6 +312,7 @@ public:
m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get());
} else {
m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get());
+ m_levels[base_level]->finalize();
}
m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index e67ae45..6cdac4e 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -106,6 +106,7 @@ public:
for (size_t i=0; i<m_shards.size(); i++) {
if (m_owns[i]) {
delete m_shards[i];
+ m_shards[i] = nullptr;
m_owns[i] = false;
}
}
@@ -113,6 +114,7 @@ public:
m_shards[0] = m_pending_shard;
m_owns[0] = true;
m_pending_shard = nullptr;
+ m_shard_cnt = 1;
}
}
@@ -241,13 +243,15 @@ private:
std::vector<bool> m_owns;
- InternalLevel *clone() {
- auto new_level = new InternalLevel(m_level_no, m_shards.size());
+ std::shared_ptr<InternalLevel> clone() {
+ auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size());
for (size_t i=0; i<m_shard_cnt; i++) {
new_level->m_shards[i] = m_shards[i];
new_level->m_owns[i] = true;
m_owns[i] = false;
}
+
+ return new_level;
}
};
diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h
index 534ce25..6055bef 100644
--- a/include/framework/Scheduler.h
+++ b/include/framework/Scheduler.h
@@ -68,7 +68,7 @@ public:
* Schedule the merge tasks (FIXME: currently this just
* executes them sequentially in a blocking fashion)
*/
- for (ssize_t i=merges.size()-1; i>=0; i--) {
+ for (ssize_t i=0; i<merges.size(); i++) {
merges[i].m_timestamp = m_timestamp.fetch_add(1);
m_merge_queue_lock.lock();
m_merge_queue.push(merges[i]);
@@ -120,6 +120,7 @@ private:
void run_merge(MergeTask task, Structure *version) {
version->merge_levels(task.m_target_level, task.m_source_level);
+
if (!version->validate_tombstone_proportion(task.m_target_level)) {
auto tasks = version->get_merge_tasks(task.m_target_level);
/*
@@ -188,7 +189,6 @@ private:
std::thread m_sched_thrd;
bool m_shutdown;
-
};
}