diff options
Diffstat (limited to 'include')
| -rw-r--r-- | include/framework/ExtensionStructure.h | 1 | ||||
| -rw-r--r-- | include/framework/InternalLevel.h | 8 | ||||
| -rw-r--r-- | include/framework/Scheduler.h | 4 |
3 files changed, 9 insertions, 4 deletions
diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h index 2fb9cf0..892e63b 100644 --- a/include/framework/ExtensionStructure.h +++ b/include/framework/ExtensionStructure.h @@ -312,6 +312,7 @@ public: m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get()); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get()); + m_levels[base_level]->finalize(); } m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index e67ae45..6cdac4e 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -106,6 +106,7 @@ public: for (size_t i=0; i<m_shards.size(); i++) { if (m_owns[i]) { delete m_shards[i]; + m_shards[i] = nullptr; m_owns[i] = false; } } @@ -113,6 +114,7 @@ public: m_shards[0] = m_pending_shard; m_owns[0] = true; m_pending_shard = nullptr; + m_shard_cnt = 1; } } @@ -241,13 +243,15 @@ private: std::vector<bool> m_owns; - InternalLevel *clone() { - auto new_level = new InternalLevel(m_level_no, m_shards.size()); + std::shared_ptr<InternalLevel> clone() { + auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size()); for (size_t i=0; i<m_shard_cnt; i++) { new_level->m_shards[i] = m_shards[i]; new_level->m_owns[i] = true; m_owns[i] = false; } + + return new_level; } }; diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h index 534ce25..6055bef 100644 --- a/include/framework/Scheduler.h +++ b/include/framework/Scheduler.h @@ -68,7 +68,7 @@ public: * Schedule the merge tasks (FIXME: currently this just * executes them sequentially in a blocking fashion) */ - for (ssize_t i=merges.size()-1; i>=0; i--) { + for (ssize_t i=0; i<merges.size(); i++) { merges[i].m_timestamp = m_timestamp.fetch_add(1); m_merge_queue_lock.lock(); m_merge_queue.push(merges[i]); @@ -120,6 +120,7 @@ private: void run_merge(MergeTask task, Structure *version) { version->merge_levels(task.m_target_level, task.m_source_level); + if (!version->validate_tombstone_proportion(task.m_target_level)) { auto tasks = version->get_merge_tasks(task.m_target_level); /* @@ -188,7 +189,6 @@ private: std::thread m_sched_thrd; bool m_shutdown; - }; } |