From 3c127eda69295cb306739bdd3c5ddccff6026a8d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 13 Dec 2023 12:39:54 -0500 Subject: Refactoring: corrected a number of names and added more comments --- include/framework/DynamicExtension.h | 77 ++++++------ include/framework/scheduling/Epoch.h | 2 +- include/framework/scheduling/Task.h | 4 +- include/framework/structure/ExtensionStructure.h | 142 +++++++++++------------ include/framework/structure/InternalLevel.h | 58 ++++++--- include/framework/structure/MutableBuffer.h | 18 ++- include/framework/util/Configuration.h | 2 +- tests/internal_level_tests.cpp | 4 +- 8 files changed, 165 insertions(+), 142 deletions(-) diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 8edcc5f..fe43c52 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -165,8 +165,8 @@ public: return m_buffer_capacity; } - Shard *create_static_structure(bool await_merge_completion=false) { - if (await_merge_completion) { + Shard *create_static_structure(bool await_reconstruction_completion=false) { + if (await_reconstruction_completion) { await_next_epoch(); } @@ -179,7 +179,7 @@ public: if (vers->get_levels().size() > 0) { for (int i=vers->get_levels().size() - 1; i>= 0; i--) { if (vers->get_levels()[i]) { - shards.emplace_back(vers->get_levels()[i]->get_merged_shard()); + shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); } } } @@ -261,10 +261,10 @@ private: auto compactions = structure->get_compaction_tasks(); while (compactions.size() > 0) { - /* otherwise, we need to schedule a merge to compact tombstones */ - MergeArgs *args = new MergeArgs(); + /* otherwise, we need to schedule a compaction */ + ReconstructionArgs *args = new ReconstructionArgs(); args->epoch = epoch; - // FIXME: all full buffers can be merged at this point--but that requires + // FIXME: all full buffers can be flushed at this point--but that requires // retooling the shard interface a bit to do efficiently. args->merges = compactions; args->extension = this; @@ -273,9 +273,9 @@ private: auto wait = args->result.get_future(); epoch->start_job(); - m_sched.schedule_job(merge, 0, args); + m_sched.schedule_job(reconstruction, 0, args); - /* wait for merge completion */ + /* wait for reconstruction completion */ wait.get(); compactions = structure->get_compaction_tasks(); @@ -308,7 +308,7 @@ private: /* * Verify the tombstone invariant within the epoch's structure, this - * may require scheduling additional merges. + * may require scheduling additional reconstructions. * * FIXME: having this inside the lock is going to TANK * insertion performance. @@ -325,8 +325,9 @@ private: size_t old_buffer_cnt = new_epoch->clear_buffers(); /* - * skip the first buffer, as this was the one that got merged, - * and copy all the other buffer references into the new epoch + * skip the first buffer, as this was flushed into the epoch's + * structure already, and copy all the other buffer references + * into the new epoch */ for (size_t i=1; iget_buffers().size(); i++) { new_epoch->add_buffer(old_epoch->get_buffers()[i]); @@ -352,7 +353,7 @@ private: _Epoch *create_new_epoch() { /* * This epoch access is _not_ protected under the assumption that - * only one merge will be able to trigger at a time. If that condition + * only one reconstruction will be able to trigger at a time. If that condition * is violated, it is possible that this code will clone a retired * epoch. */ @@ -368,7 +369,7 @@ private: /* * Add a new empty buffer. This is intended to be used - * when a merge is triggered, to allow for inserts to be sustained in the new + * when a reconstruction is triggered, to allow for inserts to be sustained in the new * buffer while a new epoch is being created in the background. Returns a * pointer to the newly created buffer. */ @@ -429,13 +430,12 @@ private: */ do { - m_epoch_retire_lk.lock(); if (epoch->retirable()) { break; } - m_epoch_retire_lk.unlock(); } while (true); + m_epoch_retire_lk.lock(); /* remove epoch from the framework's map */ m_epochs.erase(epoch->get_epoch_number()); @@ -473,26 +473,26 @@ private: } } - static void merge(void *arguments) { - MergeArgs *args = (MergeArgs *) arguments; + static void reconstruction(void *arguments) { + ReconstructionArgs *args = (ReconstructionArgs *) arguments; Structure *vers = args->epoch->get_structure(); // FIXME: with an improved shard interface, multiple full buffers - // could be merged at once here. + // could be flushed at once here. Buffer *buff = (Buffer *) args->epoch->get_buffers()[0]; for (ssize_t i=0; imerges.size(); i++) { - vers->merge_levels(args->merges[i].second, args->merges[i].first); + vers->reconstruction(args->merges[i].second, args->merges[i].first); } /* - * if the merge is a compaction, don't push the buffer down, - * as there is no guarantee that the merges will free up - * sufficient space in L0 + * if performing a compaction, don't push the buffer down, + * as there is no guarantee that any necessary reconstructions + * will free sufficient space in L0 to support a flush */ if (!args->compaction) { - vers->merge_buffer(buff); + vers->flush_buffer(buff); } args->epoch->end_job(); @@ -501,7 +501,7 @@ private: /* * Compactions occur on an epoch _before_ it becomes active, * and as a result the active epoch should _not_ be advanced as - * part of a compaction merge + * part of a compaction */ if (!args->compaction) { ((DynamicExtension *) args->extension)->advance_epoch(); @@ -556,18 +556,19 @@ private: delete args; } - void schedule_merge() { + void schedule_reconstruction() { + //fprintf(stderr, "%ld\t Reconstruction Scheduling", m_current_epoch); auto epoch = create_new_epoch(); epoch->start_job(); - MergeArgs *args = new MergeArgs(); + ReconstructionArgs *args = new ReconstructionArgs(); args->epoch = epoch; - // FIXME: all full buffers can be merged at this point--but that requires + // FIXME: all full buffers can be flushed at this point--but that requires // retooling the shard interface a bit to do efficiently. - args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count()); + args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffers()[0]->get_record_count()); args->extension = this; args->compaction = false; - m_sched.schedule_job(merge, 0, args); + m_sched.schedule_job(reconstruction, 0, args); } std::future> schedule_query(void *query_parms) { @@ -592,27 +593,27 @@ private: assert(buffer); /* - * If the buffer is full and there is no current merge, - * schedule a merge and add a new empty buffer. If there - * is a current merge, then just add a new empty buffer + * If the buffer is full and there is no ongoing reconstruction, + * schedule a reconstruction and add a new empty buffer. If there + * is an ongoing reconstruction, then add a new empty buffer * to the current epoch. */ if (buffer->is_full()) { if constexpr (std::same_as) { - /* single threaded: run merge and then empty buffer */ + /* single threaded: run reconstruction and then empty buffer */ epoch->end_job(); - schedule_merge(); + schedule_reconstruction(); buffer->truncate(); continue; - } else if (epoch->prepare_merge()) { + } else if (epoch->prepare_reconstruction()) { /* * add an empty buffer to allow insert proceed and - * schedule a merge on a background thread + * schedule a reconstruction on a background thread */ buffer = add_empty_buffer(); - schedule_merge(); + schedule_reconstruction(); } else { - /* background merge is ongoing, so just add empty buffer */ + /* background reconstruction is ongoing, so just add empty buffer */ buffer = add_empty_buffer(); } } diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index fc08d57..4e1b8a2 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -170,7 +170,7 @@ public: * isn't, return true and set a flag indicating that * there is an active merge. */ - bool prepare_merge() { + bool prepare_reconstruction() { auto old = m_active_merge.load(); if (old) { return false; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index c10ed8b..16f5e58 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -17,9 +17,9 @@ namespace de { template -struct MergeArgs { +struct ReconstructionArgs { Epoch *epoch; - std::vector merges; + std::vector merges; std::promise result; bool compaction; void *extension; diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index a174805..3cd55ac 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -45,8 +45,8 @@ public: /* * Create a shallow copy of this extension structure. The copy will share references to the * same levels/shards as the original, but will have its own lists. As all of the shards are - * immutable (with the exception of deletes), the copy can be restructured with merges, etc., - * without affecting the original. The copied structure will be returned with a reference + * immutable (with the exception of deletes), the copy can be restructured with reconstructions + * and flushes without affecting the original. The copied structure will be returned with a reference * count of 0; generally you will want to immediately call take_reference() on it. * * NOTE: When using tagged deletes, a delete of a record in the original structure will affect @@ -55,7 +55,7 @@ public: * need to be forwarded to the appropriate structures manually. */ ExtensionStructure *copy() { - auto new_struct = new ExtensionStructure(m_buffer_size, m_scale_factor, m_max_delete_prop); + auto new_struct = new ExtensionStructure(m_buffer_size, m_scale_factor, m_max_delete_prop); for (size_t i=0; im_levels.push_back(m_levels[i]->clone()); } @@ -90,17 +90,20 @@ public: } /* - * Merge the memory table down into the tree, completing any required other - * merges to make room for it. + * Flush a buffer into the extension structure, performing any necessary + * reconstructions to free up room in L0. + * + * FIXME: arguably, this should be a method attached to the buffer that + * takes a structure as input. */ - inline bool merge_buffer(Buffer *buffer) { - assert(can_merge_with(0, buffer->get_record_count())); + inline bool flush_buffer(Buffer *buffer) { + assert(can_reconstruct_with(0, buffer->get_record_count())); // FIXME: this step makes an extra copy of the buffer, // which could be avoided by adjusting the shard // reconstruction process a bit, possibly. - buffer->start_merge(); - merge_buffer_into_l0(buffer); + buffer->start_flush(); + flush_buffer_into_l0(buffer); return true; } @@ -123,7 +126,7 @@ public: * Return the total number of tombstones contained within all of the * levels of the structure. */ - size_t get_tombstone_cnt() { + size_t get_tombstone_count() { size_t cnt = 0; for (size_t i=0; i get_compaction_tasks() { - std::vector tasks; + std::vector get_compaction_tasks() { + std::vector tasks; /* if the tombstone/delete invariant is satisfied, no need for compactions */ if (validate_tombstone_proportion()) { @@ -220,16 +223,16 @@ public: assert(violation_idx != -1); - level_index merge_base_level = find_mergable_level(violation_idx); - if (merge_base_level == -1) { - merge_base_level = grow(); + level_index base_level = find_reconstruction_target(violation_idx); + if (base_level == -1) { + base_level = grow(); } - for (level_index i=merge_base_level; i>0; i--) { - MergeTask task = {i-1, i}; + for (level_index i=base_level; i>0; i--) { + ReconstructionTask task = {i-1, i}; /* - * The amount of storage required for the merge accounts + * The amount of storage required for the reconstruction accounts * for the cost of storing the new records, along with the * cost of retaining the old records during the process * (hence the 2x multiplier). @@ -240,7 +243,7 @@ public: */ size_t reccnt = m_levels[i-1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_merge_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt)) { reccnt += m_levels[i]->get_record_count(); } } @@ -255,28 +258,27 @@ public: /* * */ - std::vector get_merge_tasks(size_t buffer_reccnt) { - std::vector merges; + std::vector get_reconstruction_tasks(size_t buffer_reccnt) { + std::vector reconstructions; /* - * The buffer -> L0 merge task is not included so if that - * can be done without any other change, just return an - * empty list. + * The buffer flush is not included so if that can be done without any + * other change, just return an empty list. */ - if (can_merge_with(0, buffer_reccnt)) { - return std::move(merges); + if (can_reconstruct_with(0, buffer_reccnt)) { + return std::move(reconstructions); } - level_index merge_base_level = find_mergable_level(0); - if (merge_base_level == -1) { - merge_base_level = grow(); + level_index base_level = find_reconstruction_target(0); + if (base_level == -1) { + base_level = grow(); } - for (level_index i=merge_base_level; i>0; i--) { - MergeTask task = {i-1, i}; + for (level_index i=base_level; i>0; i--) { + ReconstructionTask task = {i-1, i}; /* - * The amount of storage required for the merge accounts + * The amount of storage required for the reconstruction accounts * for the cost of storing the new records, along with the * cost of retaining the old records during the process * (hence the 2x multiplier). @@ -287,34 +289,34 @@ public: */ size_t reccnt = m_levels[i-1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_merge_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt)) { reccnt += m_levels[i]->get_record_count(); } } //task.m_size = 2* reccnt * sizeof(R); - merges.push_back(task); + reconstructions.push_back(task); } - return std::move(merges); + return std::move(reconstructions); } /* * */ - std::vector get_merge_tasks_from_level(level_index source_level) { - std::vector merges; + std::vector get_reconstruction_tasks_from_level(level_index source_level) { + std::vector reconstructions; - level_index merge_base_level = find_mergable_level(source_level); - if (merge_base_level == -1) { - merge_base_level = grow(); + level_index base_level = find_reconstruction_target(source_level); + if (base_level == -1) { + base_level = grow(); } - for (level_index i=merge_base_level; i>source_level; i--) { - MergeTask task = {i - 1, i}; + for (level_index i=base_level; i>source_level; i--) { + ReconstructionTask task = {i - 1, i}; /* - * The amount of storage required for the merge accounts + * The amount of storage required for the reconstruction accounts * for the cost of storing the new records, along with the * cost of retaining the old records during the process * (hence the 2x multiplier). @@ -325,31 +327,30 @@ public: */ size_t reccnt = m_levels[i-1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_merge_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt)) { reccnt += m_levels[i]->get_record_count(); } } // task.m_size = 2* reccnt * sizeof(R); - merges.push_back(task); + reconstructions.push_back(task); } - return merges; + return reconstructions; } /* - * Merge the level specified by incoming level into the level specified - * by base level. The two levels should be sequential--i.e. no levels - * are skipped in the merge process--otherwise the tombstone ordering - * invariant may be violated by the merge operation. + * Combine incoming_level with base_level and reconstruct the shard, + * placing it in base_level. The two levels should be sequential--i.e. no + * levels are skipped in the reconstruction process--otherwise the + * tombstone ordering invariant may be violated. */ - inline void merge_levels(level_index base_level, level_index incoming_level) { - // merging two memory levels + inline void reconstruction(level_index base_level, level_index incoming_level) { if constexpr (L == LayoutPolicy::LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get()); + m_levels[base_level] = InternalLevel::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get()); } else { - m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get()); + m_levels[base_level]->append_level(m_levels[incoming_level].get()); m_levels[base_level]->finalize(); } @@ -391,9 +392,7 @@ private: std::vector>> m_levels; /* - * Add a new level to the LSM Tree and return that level's index. Will - * automatically determine whether the level should be on memory or on disk, - * and act appropriately. + * Add a new level to the structure and return its index. */ inline level_index grow() { level_index new_idx = m_levels.size(); @@ -405,22 +404,18 @@ private: /* * Find the first level below the level indicated by idx that - * is capable of sustaining a merge operation and return its + * is capable of sustaining a reconstruction and return its * level index. If no such level exists, returns -1. Also - * returns -1 if idx==0, and no such level exists, to skimplify - * the logic of the first merge. + * returns -1 if idx==0, and no such level exists, to simplify + * the logic of the first buffer flush. */ - inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { + inline level_index find_reconstruction_target(level_index idx, Buffer *buffer=nullptr) { if (idx == 0 && m_levels.size() == 0) return -1; - bool level_found = false; - bool disk_level; - level_index merge_level_idx; - size_t incoming_rec_cnt = get_level_record_count(idx, buffer); for (level_index i=idx+1; i(0, 1); temp_level->append_buffer(buffer); - auto new_level = InternalLevel::merge_levels(old_level, temp_level); + auto new_level = InternalLevel::reconstruction(old_level, temp_level); m_levels[0] = new_level; delete temp_level; @@ -479,13 +474,10 @@ private: } /* - * Determines if the specific level can merge with another record containing - * incoming_rec_cnt number of records. The provided level index should be - * non-negative (i.e., not refer to the buffer) and will be automatically - * translated into the appropriate index into either the disk or memory level - * vector. + * Determines if a level can sustain a reconstruction with incoming_rec_cnt + * additional records without exceeding its capacity. */ - inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) { + inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) { if (idx >= m_levels.size() || !m_levels[idx]) { return false; } diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index d146b73..e70ed76 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -40,9 +40,14 @@ public: delete m_pending_shard; } - // WARNING: for leveling only. - // assuming the base level is the level new level is merging into. (base_level is larger.) - static std::shared_ptr merge_levels(InternalLevel* base_level, InternalLevel* new_level) { + /* + * Create a new shard combining the records from base_level and new_level, + * and return a shared_ptr to a new level containing this shard. This is used + * for reconstructions under the leveling layout policy. + * + * No changes are made to the levels provided as arguments. + */ + static std::shared_ptr reconstruction(InternalLevel* base_level, InternalLevel* new_level) { assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); auto res = new InternalLevel(base_level->m_level_no, 1); res->m_shard_cnt = 1; @@ -54,18 +59,15 @@ public: return std::shared_ptr(res); } - void append_buffer(Buffer* buffer) { - if (m_shard_cnt == m_shards.size()) { - assert(m_pending_shard == nullptr); - m_pending_shard = new S(buffer); - return; - } - - m_shards[m_shard_cnt] = std::make_shared(buffer); - ++m_shard_cnt; - } - - void append_merged_shards(InternalLevel* level) { + /* + * Create a new shard combining the records from all of + * the shards in level, and append this new shard into + * this level. This is used for reconstructions under + * the tiering layout policy. + * + * No changes are made to the level provided as an argument. + */ + void append_level(InternalLevel* level) { Shard *shards[level->m_shard_cnt]; for (size_t i=0; im_shard_cnt; i++) { shards[i] = level->m_shards[i].get(); @@ -82,6 +84,22 @@ public: ++m_shard_cnt; } + /* + * Create a new shard using the records in the + * provided buffer, and append this new shard + * into this level. This is used for buffer + * flushes under the tiering layout policy. + */ + void append_buffer(Buffer* buffer) { + if (m_shard_cnt == m_shards.size()) { + assert(m_pending_shard == nullptr); + m_pending_shard = new S(buffer); + return; + } + + m_shards[m_shard_cnt] = std::make_shared(buffer); + ++m_shard_cnt; + } void finalize() { if (m_pending_shard) { @@ -95,7 +113,13 @@ public: } } - Shard *get_merged_shard() { + /* + * Create a new shard containing the combined records + * from all shards on this level and return it. + * + * No changes are made to this level. + */ + Shard *get_combined_shard() { if (m_shard_cnt == 0) { return nullptr; } @@ -109,7 +133,7 @@ public: return new S(shards, m_shard_cnt); } - // Append the sample range in-order..... + /* Append the sample range in-order */ void get_query_states(std::vector> &shards, std::vector& shard_states, void *query_parms) { for (size_t i=0; i + * Copyright (C) 2023 Douglas B. Rumbaugh * Dong Xie * * Distributed under the Modified BSD License. * + * FIXME: currently, the buffer itself is responsible for managing a + * secondary buffer for storing sorted records used during buffer flushes. It + * probably makes more sense to make the shard being flushed into responsible + * for this instead. This would also facilitate simultaneous flushes of multiple + * buffers more easily. + * */ #pragma once @@ -35,7 +41,7 @@ public: : m_cap(capacity), m_tombstone_cap(capacity), m_reccnt(0) , m_tombstonecnt(0), m_weight(0), m_max_weight(0), m_tail(0) { m_data = (Wrapped*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped)); - m_merge_data = (Wrapped*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped)); + m_sorted_data = (Wrapped*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped)); m_tombstone_filter = nullptr; if (max_tombstone_cap > 0) { m_tombstone_filter = new psudb::BloomFilter(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS); @@ -49,7 +55,7 @@ public: if (m_data) free(m_data); if (m_tombstone_filter) delete m_tombstone_filter; - if (m_merge_data) free(m_merge_data); + if (m_sorted_data) free(m_sorted_data); } template @@ -171,8 +177,8 @@ public: * to be adjusted). Other threads having read access is perfectly * acceptable, however. */ - bool start_merge() { - memcpy(m_merge_data, m_data, sizeof(Wrapped) * m_reccnt.load()); + bool start_flush() { + memcpy(m_sorted_data, m_data, sizeof(Wrapped) * m_reccnt.load()); return true; } @@ -210,7 +216,7 @@ private: size_t m_tombstone_cap; Wrapped* m_data; - Wrapped* m_merge_data; + Wrapped* m_sorted_data; psudb::BloomFilter* m_tombstone_filter; diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 866128a..8e3d20f 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -49,6 +49,6 @@ enum class DeletePolicy { }; typedef ssize_t level_index; -typedef std::pair MergeTask; +typedef std::pair ReconstructionTask; } diff --git a/tests/internal_level_tests.cpp b/tests/internal_level_tests.cpp index cb74bca..9299c07 100644 --- a/tests/internal_level_tests.cpp +++ b/tests/internal_level_tests.cpp @@ -37,7 +37,7 @@ START_TEST(t_memlevel_merge) merging_level->append_buffer(tbl2); ck_assert_int_eq(merging_level->get_record_count(), 100); - auto new_level = ILevel::merge_levels(base_level, merging_level); + auto new_level = ILevel::reconstruction(base_level, merging_level); delete merging_level; ck_assert_int_eq(new_level->get_record_count(), 200); @@ -66,7 +66,7 @@ Suite *unit_testing() { Suite *unit = suite_create("InternalLevel Unit Testing"); - TCase *merge = tcase_create("de::InternalLevel::merge_level Testing"); + TCase *merge = tcase_create("de::InternalLevel::reconstruction Testing"); tcase_add_test(merge, t_memlevel_merge); suite_add_tcase(unit, merge); -- cgit v1.2.3