From 6e30f576ca9d11d1901f4877315e97f84d15b1e1 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 18 Sep 2023 16:37:30 -0400 Subject: The scheduler now spawns a seperate merge thread Merges are now executed from a seperate thread within the scheduler that wakes up via condition variables when new merge tasks are scheduled. In addition, tombstone limits are now enforced by the scheduler, with new merges being scheduled as needed. There are still a few tests failing, notably the zero tombstones in the last run invarient is not holding under tiering with tombstones. Need to look into that yet. --- include/framework/ExtensionStructure.h | 109 ++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 51 deletions(-) (limited to 'include/framework/ExtensionStructure.h') diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h index 9a5f6b3..2fb9cf0 100644 --- a/include/framework/ExtensionStructure.h +++ b/include/framework/ExtensionStructure.h @@ -33,8 +33,12 @@ struct MergeTask { size_t m_size; size_t m_timestamp; - bool operator<(MergeTask &other) { - return m_timestamp < other.m_timestamp; + friend bool operator<(const MergeTask &self, const MergeTask &other) { + return self.m_timestamp < other.m_timestamp; + } + + friend bool operator>(const MergeTask &self, const MergeTask &other) { + return self.m_timestamp > other.m_timestamp; } }; @@ -66,7 +70,7 @@ public: ExtensionStructure *copy() { auto new_struct = new ExtensionStructure(m_scale_factor, m_max_delete_prop, m_buffer_size); for (size_t i=0; im_levels.push_back(m_levels[i]); + new_struct->m_levels.push_back(m_levels[i]->clone()); } return new_struct; @@ -104,9 +108,8 @@ public: assert(can_merge_with(0, buffer->get_record_count())); merge_buffer_into_l0(buffer); - enforce_delete_maximum(0); - buffer->truncate(); + return true; } @@ -193,6 +196,11 @@ public: return true; } + bool validate_tombstone_proportion(level_index level) { + long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level); + return ts_prop <= (long double) m_max_delete_prop; + } + /* * Return a reference to the underlying vector of levels within the * structure. @@ -250,6 +258,47 @@ public: return std::move(merges); } + + /* + * + */ + std::vector get_merge_tasks_from_level(size_t source_level) { + std::vector merges; + + level_index merge_base_level = find_mergable_level(source_level); + if (merge_base_level == -1) { + merge_base_level = grow(); + } + + for (level_index i=merge_base_level; i>source_level; i--) { + MergeTask task; + task.m_source_level = i - 1; + task.m_target_level = i; + + /* + * The amount of storage required for the merge accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i-1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_merge_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } + task.m_size = 2* reccnt * sizeof(R); + + merges.push_back(task); + } + + return std::move(merges); + } + /* * Merge the level specified by incoming level into the level specified * by base level. The two levels should be sequential--i.e. no levels @@ -282,44 +331,18 @@ private: * and act appropriately. */ inline level_index grow() { - level_index new_idx; - + level_index new_idx = m_levels.size(); size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - new_idx = m_levels.size(); - if (new_idx > 0) { - assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); - } - m_levels.emplace_back(std::shared_ptr>(new InternalLevel(new_idx, new_shard_cnt))); + m_levels.emplace_back(std::shared_ptr>(new InternalLevel(new_idx, new_shard_cnt))); return new_idx; } - - /* - * Merge the specified level down into the tree. The level index must be - * non-negative (i.e., this function cannot be used to merge the buffer). This - * routine will recursively perform any necessary merges to make room for the - * specified level. - */ - inline void merge_down(level_index idx) { - level_index merge_base_level = find_mergable_level(idx); - if (merge_base_level == -1) { - merge_base_level = grow(); - } - - for (level_index i=merge_base_level; i>idx; i--) { - merge_levels(i, i-1); - enforce_delete_maximum(i); - } - - return; - } - /* * Find the first level below the level indicated by idx that * is capable of sustaining a merge operation and return its * level index. If no such level exists, returns -1. Also - * returns -1 if idx==0, and no such level exists, to simplify + * returns -1 if idx==0, and no such level exists, to skimplify * the logic of the first merge. */ inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { @@ -342,7 +365,6 @@ private: return -1; } - inline void merge_buffer_into_l0(Buffer *buffer) { assert(m_levels[0]); if constexpr (L == LayoutPolicy::LEVELING) { @@ -369,21 +391,6 @@ private: level.reset(); } - /* - * Check the tombstone proportion for the specified level and - * if the limit is exceeded, forcibly merge levels until all - * levels below idx are below the limit. - */ - inline void enforce_delete_maximum(level_index idx) { - long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx); - - if (ts_prop > (long double) m_max_delete_prop) { - merge_down(idx); - } - - return; - } - /* * Assume that level "0" should be larger than the buffer. The buffer * itself is index -1, which should return simply the buffer capacity. @@ -424,7 +431,7 @@ private: return m_levels[idx]->get_shard_count() < m_scale_factor; } - // unreachable + /* unreachable */ assert(true); } }; -- cgit v1.2.3