diff options
Diffstat (limited to 'include/framework/ExtensionStructure.h')
| -rw-r--r-- | include/framework/ExtensionStructure.h | 101 |
1 files changed, 80 insertions, 21 deletions
diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h index 1e756db..9a5f6b3 100644 --- a/include/framework/ExtensionStructure.h +++ b/include/framework/ExtensionStructure.h @@ -27,6 +27,17 @@ namespace de { +struct MergeTask { + level_index m_source_level; + level_index m_target_level; + size_t m_size; + size_t m_timestamp; + + bool operator<(MergeTask &other) { + return m_timestamp < other.m_timestamp; + } +}; + template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING> class ExtensionStructure { typedef S Shard; @@ -90,9 +101,7 @@ public: * merges to make room for it. */ inline bool merge_buffer(Buffer *buffer) { - if (!can_merge_with(0, buffer->get_record_count())) { - merge_down(0); - } + assert(can_merge_with(0, buffer->get_record_count())); merge_buffer_into_l0(buffer); enforce_delete_maximum(0); @@ -192,6 +201,74 @@ public: return m_levels; } + /* + * + */ + std::vector<MergeTask> get_merge_tasks(size_t buffer_reccnt) { + std::vector<MergeTask> merges; + + /* + * The buffer -> L0 merge task is not included so if that + * can be done without any other change, just return an + * empty list. + */ + if (can_merge_with(0, buffer_reccnt)) { + return std::move(merges); + } + + level_index merge_base_level = find_mergable_level(0); + if (merge_base_level == -1) { + merge_base_level = grow(); + } + + for (level_index i=merge_base_level; i>0; i--) { + MergeTask task; + task.m_source_level = i - 1; + task.m_target_level = i; + + /* + * The amount of storage required for the merge accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i-1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_merge_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } + task.m_size = 2* reccnt * sizeof(R); + + merges.push_back(task); + } + + return std::move(merges); + } + + /* + * Merge the level specified by incoming level into the level specified + * by base level. The two levels should be sequential--i.e. no levels + * are skipped in the merge process--otherwise the tombstone ordering + * invariant may be violated by the merge operation. + */ + inline void merge_levels(level_index base_level, level_index incoming_level) { + // merging two memory levels + if constexpr (L == LayoutPolicy::LEVELING) { + auto tmp = m_levels[base_level]; + m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get()); + } else { + m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get()); + } + + m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + } + + private: size_t m_scale_factor; double m_max_delete_prop; @@ -265,24 +342,6 @@ private: return -1; } - /* - * Merge the level specified by incoming level into the level specified - * by base level. The two levels should be sequential--i.e. no levels - * are skipped in the merge process--otherwise the tombstone ordering - * invariant may be violated by the merge operation. - */ - inline void merge_levels(level_index base_level, level_index incoming_level) { - // merging two memory levels - if constexpr (L == LayoutPolicy::LEVELING) { - auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get()); - } else { - m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get()); - } - - m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); - } - inline void merge_buffer_into_l0(Buffer *buffer) { assert(m_levels[0]); |