diff options
Diffstat (limited to 'include/framework/ExtensionStructure.h')
| -rw-r--r-- | include/framework/ExtensionStructure.h | 109 |
1 files changed, 58 insertions, 51 deletions
diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h index 9a5f6b3..2fb9cf0 100644 --- a/include/framework/ExtensionStructure.h +++ b/include/framework/ExtensionStructure.h @@ -33,8 +33,12 @@ struct MergeTask { size_t m_size; size_t m_timestamp; - bool operator<(MergeTask &other) { - return m_timestamp < other.m_timestamp; + friend bool operator<(const MergeTask &self, const MergeTask &other) { + return self.m_timestamp < other.m_timestamp; + } + + friend bool operator>(const MergeTask &self, const MergeTask &other) { + return self.m_timestamp > other.m_timestamp; } }; @@ -66,7 +70,7 @@ public: ExtensionStructure<R, S, Q, L> *copy() { auto new_struct = new ExtensionStructure<R, S, Q, L>(m_scale_factor, m_max_delete_prop, m_buffer_size); for (size_t i=0; i<m_levels.size(); i++) { - new_struct->m_levels.push_back(m_levels[i]); + new_struct->m_levels.push_back(m_levels[i]->clone()); } return new_struct; @@ -104,9 +108,8 @@ public: assert(can_merge_with(0, buffer->get_record_count())); merge_buffer_into_l0(buffer); - enforce_delete_maximum(0); - buffer->truncate(); + return true; } @@ -193,6 +196,11 @@ public: return true; } + bool validate_tombstone_proportion(level_index level) { + long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level); + return ts_prop <= (long double) m_max_delete_prop; + } + /* * Return a reference to the underlying vector of levels within the * structure. @@ -250,6 +258,47 @@ public: return std::move(merges); } + + /* + * + */ + std::vector<MergeTask> get_merge_tasks_from_level(size_t source_level) { + std::vector<MergeTask> merges; + + level_index merge_base_level = find_mergable_level(source_level); + if (merge_base_level == -1) { + merge_base_level = grow(); + } + + for (level_index i=merge_base_level; i>source_level; i--) { + MergeTask task; + task.m_source_level = i - 1; + task.m_target_level = i; + + /* + * The amount of storage required for the merge accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i-1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_merge_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } + task.m_size = 2* reccnt * sizeof(R); + + merges.push_back(task); + } + + return std::move(merges); + } + /* * Merge the level specified by incoming level into the level specified * by base level. The two levels should be sequential--i.e. no levels @@ -282,44 +331,18 @@ private: * and act appropriately. */ inline level_index grow() { - level_index new_idx; - + level_index new_idx = m_levels.size(); size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - new_idx = m_levels.size(); - if (new_idx > 0) { - assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); - } - m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt))); + m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt))); return new_idx; } - - /* - * Merge the specified level down into the tree. The level index must be - * non-negative (i.e., this function cannot be used to merge the buffer). This - * routine will recursively perform any necessary merges to make room for the - * specified level. - */ - inline void merge_down(level_index idx) { - level_index merge_base_level = find_mergable_level(idx); - if (merge_base_level == -1) { - merge_base_level = grow(); - } - - for (level_index i=merge_base_level; i>idx; i--) { - merge_levels(i, i-1); - enforce_delete_maximum(i); - } - - return; - } - /* * Find the first level below the level indicated by idx that * is capable of sustaining a merge operation and return its * level index. If no such level exists, returns -1. Also - * returns -1 if idx==0, and no such level exists, to simplify + * returns -1 if idx==0, and no such level exists, to skimplify * the logic of the first merge. */ inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { @@ -342,7 +365,6 @@ private: return -1; } - inline void merge_buffer_into_l0(Buffer *buffer) { assert(m_levels[0]); if constexpr (L == LayoutPolicy::LEVELING) { @@ -370,21 +392,6 @@ private: } /* - * Check the tombstone proportion for the specified level and - * if the limit is exceeded, forcibly merge levels until all - * levels below idx are below the limit. - */ - inline void enforce_delete_maximum(level_index idx) { - long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx); - - if (ts_prop > (long double) m_max_delete_prop) { - merge_down(idx); - } - - return; - } - - /* * Assume that level "0" should be larger than the buffer. The buffer * itself is index -1, which should return simply the buffer capacity. */ @@ -424,7 +431,7 @@ private: return m_levels[idx]->get_shard_count() < m_scale_factor; } - // unreachable + /* unreachable */ assert(true); } }; |