From 7c03d771475421c1d5a2bbc135242536af1a371c Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 25 Sep 2023 10:49:36 -0400 Subject: Re-structuring Project + scheduling updates This is a big one--probably should have split it apart, but I'm feeling lazy this morning. * Organized the mess of header files in include/framework by splitting them out into their own subdirectories, and renaming a few files to remove redundancies introduced by the directory structure. * Introduced a new framework/ShardRequirements.h header file for simpler shard development. This header simply contains the necessary includes from framework/* for creating shard files. This should help to remove structural dependencies from the framework file structure and shards, as well as centralizing the necessary framework files to make shard development easier. * Created a (currently dummy) SchedulerInterface, and make the scheduler implementation a template parameter of the dynamic extension for easier testing of various scheduling policies. There's still more work to be done to fully integrate the scheduler (queries, multiple buffers), but some more of the necessary framework code for this has been added as well. * Adjusted the Task interface setup for the scheduler. The task structures have been removed from ExtensionStructure and placed in their own header file. Additionally, I started experimenting with using std::variant, as opposed to inheritence, to implement subtype polymorphism on the Merge and Query tasks. The scheduler now has a general task queue that contains both, and std::variant, std::visit, and std::get are used to manipulate them without virtual functions. * Removed Alex.h, as it can't build anyway. There's a branch out there containing the Alex implementation stripped of the C++20 stuff. So there's no need to keep it here. --- include/framework/structure/ExtensionStructure.h | 428 +++++++++++++++++++++++ 1 file changed, 428 insertions(+) create mode 100644 include/framework/structure/ExtensionStructure.h (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h new file mode 100644 index 0000000..920e1c3 --- /dev/null +++ b/include/framework/structure/ExtensionStructure.h @@ -0,0 +1,428 @@ +/* + * include/framework/ExtensionStructure.h + * + * Copyright (C) 2023 Douglas Rumbaugh + * Dong Xie + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include +#include +#include +#include + +#include "framework/structure/MutableBuffer.h" +#include "framework/structure/InternalLevel.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" +#include "framework/interface/Record.h" + +#include "framework/util/Configuration.h" +#include "framework/scheduling/Task.h" + +#include "psu-util/timer.h" +#include "psu-ds/Alias.h" + +namespace de { + +template +class ExtensionStructure { + typedef S Shard; + typedef MutableBuffer Buffer; + +public: + ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) + : m_scale_factor(scale_factor) + , m_max_delete_prop(max_delete_prop) + , m_buffer_size(buffer_size) + {} + + ~ExtensionStructure() = default; + + /* + * Create a shallow copy of this extension structure. The copy will share references to the + * same levels/shards as the original, but will have its own lists. As all of the shards are + * immutable (with the exception of deletes), the copy can be restructured with merges, etc., + * without affecting the original. + * + * NOTE: When using tagged deletes, a delete of a record in the original structure will affect + * the copy, so long as the copy retains a reference to the same shard as the original. This could + * cause synchronization problems under tagging with concurrency. Any deletes in this context will + * need to be forwarded to the appropriate structures manually. + */ + ExtensionStructure *copy() { + auto new_struct = new ExtensionStructure(m_scale_factor, m_max_delete_prop, m_buffer_size); + for (size_t i=0; im_levels.push_back(m_levels[i]->clone()); + } + + return new_struct; + } + + /* + * Search for a record matching the argument and mark it deleted by + * setting the delete bit in its wrapped header. Returns 1 if a matching + * record was found and deleted, and 0 if a matching record was not found. + * + * This function will stop after finding the first matching record. It is assumed + * that no duplicate records exist. In the case of duplicates, this function will + * still "work", but in the sense of "delete first match". + */ + int tagged_delete(const R &rec) { + for (auto level : m_levels) { + if (level && level->delete_record(rec)) { + return 1; + } + } + + /* + * If the record to be erased wasn't found, return 0. The + * DynamicExtension itself will then search the active + * Buffers. + */ + return 0; + } + + /* + * Merge the memory table down into the tree, completing any required other + * merges to make room for it. + */ + inline bool merge_buffer(Buffer *buffer) { + assert(can_merge_with(0, buffer->get_record_count())); + + merge_buffer_into_l0(buffer); + buffer->truncate(); + + return true; + } + + /* + * Return the total number of records (including tombstones) within all + * of the levels of the structure. + */ + size_t get_record_count() { + size_t cnt = 0; + + for (size_t i=0; iget_record_count(); + } + + return cnt; + } + + /* + * Return the total number of tombstones contained within all of the + * levels of the structure. + */ + size_t get_tombstone_cnt() { + size_t cnt = 0; + + for (size_t i=0; iget_tombstone_count(); + } + + return cnt; + } + + /* + * Return the number of levels within the structure. Note that not + * all of these levels are necessarily populated. + */ + size_t get_height() { + return m_levels.size(); + } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing the primary data structure and raw data. + */ + size_t get_memory_usage() { + size_t cnt = 0; + for (size_t i=0; iget_memory_usage(); + } + + return cnt; + } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing auxiliary data structures. This total does not + * include memory used for the main data structure, or raw data. + */ + size_t get_aux_memory_usage() { + size_t cnt = 0; + for (size_t i=0; iget_aux_memory_usage(); + } + } + + return cnt; + } + + /* + * Validate that no level in the structure exceeds its maximum tombstone capacity. This is + * used to trigger preemptive compactions at the end of the merge process. + */ + bool validate_tombstone_proportion() { + long double ts_prop; + for (size_t i=0; iget_tombstone_count() / (long double) calc_level_record_capacity(i); + if (ts_prop > (long double) m_max_delete_prop) { + return false; + } + } + } + + return true; + } + + bool validate_tombstone_proportion(level_index level) { + long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level); + return ts_prop <= (long double) m_max_delete_prop; + } + + /* + * Return a reference to the underlying vector of levels within the + * structure. + */ + std::vector>> &get_levels() { + return m_levels; + } + + /* + * + */ + std::vector get_merge_tasks(size_t buffer_reccnt) { + std::vector merges; + + /* + * The buffer -> L0 merge task is not included so if that + * can be done without any other change, just return an + * empty list. + */ + if (can_merge_with(0, buffer_reccnt)) { + return std::move(merges); + } + + level_index merge_base_level = find_mergable_level(0); + if (merge_base_level == -1) { + merge_base_level = grow(); + } + + for (level_index i=merge_base_level; i>0; i--) { + MergeTask task; + task.m_source_level = i - 1; + task.m_target_level = i; + task.m_type = TaskType::MERGE; + + /* + * The amount of storage required for the merge accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i-1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_merge_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } + task.m_size = 2* reccnt * sizeof(R); + + merges.push_back(task); + } + + return std::move(merges); + } + + + /* + * + */ + std::vector get_merge_tasks_from_level(size_t source_level) { + std::vector merges; + + level_index merge_base_level = find_mergable_level(source_level); + if (merge_base_level == -1) { + merge_base_level = grow(); + } + + for (level_index i=merge_base_level; i>source_level; i--) { + MergeTask task; + task.m_source_level = i - 1; + task.m_target_level = i; + + /* + * The amount of storage required for the merge accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i-1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_merge_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } + task.m_size = 2* reccnt * sizeof(R); + + merges.push_back(task); + } + + return std::move(merges); + } + + /* + * Merge the level specified by incoming level into the level specified + * by base level. The two levels should be sequential--i.e. no levels + * are skipped in the merge process--otherwise the tombstone ordering + * invariant may be violated by the merge operation. + */ + inline void merge_levels(level_index base_level, level_index incoming_level) { + // merging two memory levels + if constexpr (L == LayoutPolicy::LEVELING) { + auto tmp = m_levels[base_level]; + m_levels[base_level] = InternalLevel::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get()); + } else { + m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get()); + m_levels[base_level]->finalize(); + } + + m_levels[incoming_level] = std::shared_ptr>(new InternalLevel(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + } + + +private: + size_t m_scale_factor; + double m_max_delete_prop; + size_t m_buffer_size; + + std::vector>> m_levels; + + /* + * Add a new level to the LSM Tree and return that level's index. Will + * automatically determine whether the level should be on memory or on disk, + * and act appropriately. + */ + inline level_index grow() { + level_index new_idx = m_levels.size(); + size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + m_levels.emplace_back(std::shared_ptr>(new InternalLevel(new_idx, new_shard_cnt))); + return new_idx; + } + + /* + * Find the first level below the level indicated by idx that + * is capable of sustaining a merge operation and return its + * level index. If no such level exists, returns -1. Also + * returns -1 if idx==0, and no such level exists, to skimplify + * the logic of the first merge. + */ + inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { + + if (idx == 0 && m_levels.size() == 0) return -1; + + bool level_found = false; + bool disk_level; + level_index merge_level_idx; + + size_t incoming_rec_cnt = get_level_record_count(idx, buffer); + for (level_index i=idx+1; i(0, 1); + temp_level->append_buffer(buffer); + auto new_level = InternalLevel::merge_levels(old_level, temp_level); + + m_levels[0] = new_level; + delete temp_level; + } else { + m_levels[0]->append_buffer(buffer); + } + } + + /* + * Mark a given memory level as no-longer in use by the tree. For now this + * will just free the level. In future, this will be more complex as the + * level may not be able to immediately be deleted, depending upon who + * else is using it. + */ + inline void mark_as_unused(std::shared_ptr> level) { + level.reset(); + } + + /* + * Assume that level "0" should be larger than the buffer. The buffer + * itself is index -1, which should return simply the buffer capacity. + */ + inline size_t calc_level_record_capacity(level_index idx) { + return m_buffer_size * pow(m_scale_factor, idx+1); + } + + /* + * Returns the actual number of records present on a specified level. An + * index value of -1 indicates the memory table. Can optionally pass in + * a pointer to the memory table to use, if desired. Otherwise, there are + * no guarantees about which buffer will be accessed if level_index is -1. + */ + inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) { + if (buffer) { + return buffer->get_record_count(); + } + + return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; + } + + /* + * Determines if the specific level can merge with another record containing + * incoming_rec_cnt number of records. The provided level index should be + * non-negative (i.e., not refer to the buffer) and will be automatically + * translated into the appropriate index into either the disk or memory level + * vector. + */ + inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) { + if (idx>= m_levels.size() || !m_levels[idx]) { + return false; + } + + if (L == LayoutPolicy::LEVELING) { + return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); + } else { + return m_levels[idx]->get_shard_count() < m_scale_factor; + } + + /* unreachable */ + assert(true); + } +}; + +} + -- cgit v1.2.3 From 7ecfb22c32b7986ed1a2439c1abbeed298e4153a Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 20 Oct 2023 17:00:42 -0400 Subject: Initial pass w/ new scheduler setup currently there's a race condition of some type to sort out. --- include/framework/structure/ExtensionStructure.h | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 920e1c3..8344518 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -93,7 +93,10 @@ public: inline bool merge_buffer(Buffer *buffer) { assert(can_merge_with(0, buffer->get_record_count())); + buffer->start_merge(); merge_buffer_into_l0(buffer); + buffer->finish_merge(); + buffer->truncate(); return true; @@ -216,10 +219,7 @@ public: } for (level_index i=merge_base_level; i>0; i--) { - MergeTask task; - task.m_source_level = i - 1; - task.m_target_level = i; - task.m_type = TaskType::MERGE; + MergeTask task = {i-1, i}; /* * The amount of storage required for the merge accounts @@ -237,7 +237,7 @@ public: reccnt += m_levels[i]->get_record_count(); } } - task.m_size = 2* reccnt * sizeof(R); + //task.m_size = 2* reccnt * sizeof(R); merges.push_back(task); } @@ -249,7 +249,7 @@ public: /* * */ - std::vector get_merge_tasks_from_level(size_t source_level) { + std::vector get_merge_tasks_from_level(level_index source_level) { std::vector merges; level_index merge_base_level = find_mergable_level(source_level); @@ -258,10 +258,7 @@ public: } for (level_index i=merge_base_level; i>source_level; i--) { - MergeTask task; - task.m_source_level = i - 1; - task.m_target_level = i; - + MergeTask task = {i - 1, i}; /* * The amount of storage required for the merge accounts * for the cost of storing the new records, along with the @@ -278,12 +275,12 @@ public: reccnt += m_levels[i]->get_record_count(); } } - task.m_size = 2* reccnt * sizeof(R); +// task.m_size = 2* reccnt * sizeof(R); merges.push_back(task); } - return std::move(merges); + return merges; } /* -- cgit v1.2.3 From 3afacb7702e6d8fa67749a2a41dc776d315e02a9 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 23 Oct 2023 17:43:22 -0400 Subject: Began moving to an explicit epoch-based system I started moving over to an explicit Epoch based system, which has necessitated a ton of changes throughout the code base. This will ultimately allow for a much cleaner set of abstractions for managing concurrency. --- include/framework/structure/ExtensionStructure.h | 26 ++++++++++++++++++++++++ 1 file changed, 26 insertions(+) (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 8344518..de965ae 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -302,12 +302,38 @@ public: m_levels[incoming_level] = std::shared_ptr>(new InternalLevel(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); } + bool take_reference() { + m_refcnt.fetch_add(1); + return true; + } + + bool release_reference() { + assert(m_refcnt.load() > 0); + m_refcnt.fetch_add(-1); + return true; + } + + size_t get_reference_count() { + return m_refcnt.load(); + } + + std::vector get_query_states(std::vector> &shards, void *parms) { + std::vector states; + + for (auto &level : m_levels) { + level->get_query_states(shards, states, parms); + } + + return states; + } private: size_t m_scale_factor; double m_max_delete_prop; size_t m_buffer_size; + std::atomic m_refcnt; + std::vector>> m_levels; /* -- cgit v1.2.3 From d2279e1b96d352a0af1d425dcaaf93e8a26a8d52 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 30 Oct 2023 17:15:05 -0400 Subject: General Comment + Consistency updates --- include/framework/structure/ExtensionStructure.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index de965ae..1f365ae 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -1,7 +1,7 @@ /* - * include/framework/ExtensionStructure.h + * include/framework/structure/ExtensionStructure.h * - * Copyright (C) 2023 Douglas Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * Dong Xie * * All rights reserved. Published under the Modified BSD License. -- cgit v1.2.3 From 230831243a61f1ca1b1dd4319a4c5224b15d2657 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 12:05:58 -0400 Subject: ExtensionStructure: fixed incorrect constructor args in clone() --- include/framework/structure/ExtensionStructure.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 1f365ae..2ced439 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -54,7 +54,7 @@ public: * need to be forwarded to the appropriate structures manually. */ ExtensionStructure *copy() { - auto new_struct = new ExtensionStructure(m_scale_factor, m_max_delete_prop, m_buffer_size); + auto new_struct = new ExtensionStructure(m_buffer_size, m_scale_factor, m_max_delete_prop); for (size_t i=0; im_levels.push_back(m_levels[i]->clone()); } @@ -432,7 +432,7 @@ private: * vector. */ inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) { - if (idx>= m_levels.size() || !m_levels[idx]) { + if (idx >= m_levels.size() || !m_levels[idx]) { return false; } -- cgit v1.2.3 From ca729108869b4143f1eea31f6dde9195decfec9c Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 12:14:57 -0400 Subject: MutableBuffer: removed most concurrency control stuff The buffer isn't responsible for a lot of CC anymore (just the append operation), so this code was no longer necessary. Also removed the only calls to some of these CC operations within the rest of the framework. --- include/framework/structure/ExtensionStructure.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 2ced439..f5657af 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -93,11 +93,11 @@ public: inline bool merge_buffer(Buffer *buffer) { assert(can_merge_with(0, buffer->get_record_count())); + // FIXME: this step makes an extra copy of the buffer, + // which could be avoided by adjusting the shard + // reconstruction process a bit, possibly. buffer->start_merge(); merge_buffer_into_l0(buffer); - buffer->finish_merge(); - - buffer->truncate(); return true; } -- cgit v1.2.3 From 68ae6279476e7d37837ac06474fb558e50ce6706 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 12:41:55 -0400 Subject: Fixes for various bugs under SerialScheduler --- include/framework/structure/ExtensionStructure.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index f5657af..80ec7b9 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -46,7 +46,8 @@ public: * Create a shallow copy of this extension structure. The copy will share references to the * same levels/shards as the original, but will have its own lists. As all of the shards are * immutable (with the exception of deletes), the copy can be restructured with merges, etc., - * without affecting the original. + * without affecting the original. The copied structure will be returned with a reference + * count of 0; generally you will want to immediately call take_reference() on it. * * NOTE: When using tagged deletes, a delete of a record in the original structure will affect * the copy, so long as the copy retains a reference to the same shard as the original. This could @@ -59,6 +60,8 @@ public: new_struct->m_levels.push_back(m_levels[i]->clone()); } + new_struct->m_refcnt = 0; + return new_struct; } -- cgit v1.2.3 From 357cab549c2ed33970562b84ff6f83923742343d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 7 Nov 2023 15:34:24 -0500 Subject: Comment and License updates --- include/framework/structure/ExtensionStructure.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 80ec7b9..74cede6 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -4,7 +4,7 @@ * Copyright (C) 2023 Douglas B. Rumbaugh * Dong Xie * - * All rights reserved. Published under the Modified BSD License. + * Distributed under the Modified BSD License. * */ #pragma once -- cgit v1.2.3 From 90bb0614fc1d8f1a185a778e31aaf9027c01aeb8 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 13 Nov 2023 11:44:09 -0500 Subject: Tombstone Compaction: re-enabled tombstone compaction Currently, proactive buffer tombstone compaction is disabled by forcing the buffer tombstone capacity to match its record capacity. It isn't clear how to best handle proactive buffer compactions in an environment where new buffers are spawned anyway. --- include/framework/structure/ExtensionStructure.h | 51 ++++++++++++++++++++++++ 1 file changed, 51 insertions(+) (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 74cede6..a174805 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -201,6 +201,57 @@ public: return m_levels; } + std::vector get_compaction_tasks() { + std::vector tasks; + + /* if the tombstone/delete invariant is satisfied, no need for compactions */ + if (validate_tombstone_proportion()) { + return tasks; + } + + /* locate the first level to violate the invariant */ + level_index violation_idx = -1; + for (level_index i=0; i0; i--) { + MergeTask task = {i-1, i}; + + /* + * The amount of storage required for the merge accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i-1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_merge_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } + //task.m_size = 2* reccnt * sizeof(R); + + tasks.push_back(task); + } + + return tasks; + } + /* * */ -- cgit v1.2.3 From 3c127eda69295cb306739bdd3c5ddccff6026a8d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 13 Dec 2023 12:39:54 -0500 Subject: Refactoring: corrected a number of names and added more comments --- include/framework/structure/ExtensionStructure.h | 142 +++++++++++------------ 1 file changed, 67 insertions(+), 75 deletions(-) (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index a174805..3cd55ac 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -45,8 +45,8 @@ public: /* * Create a shallow copy of this extension structure. The copy will share references to the * same levels/shards as the original, but will have its own lists. As all of the shards are - * immutable (with the exception of deletes), the copy can be restructured with merges, etc., - * without affecting the original. The copied structure will be returned with a reference + * immutable (with the exception of deletes), the copy can be restructured with reconstructions + * and flushes without affecting the original. The copied structure will be returned with a reference * count of 0; generally you will want to immediately call take_reference() on it. * * NOTE: When using tagged deletes, a delete of a record in the original structure will affect @@ -55,7 +55,7 @@ public: * need to be forwarded to the appropriate structures manually. */ ExtensionStructure *copy() { - auto new_struct = new ExtensionStructure(m_buffer_size, m_scale_factor, m_max_delete_prop); + auto new_struct = new ExtensionStructure(m_buffer_size, m_scale_factor, m_max_delete_prop); for (size_t i=0; im_levels.push_back(m_levels[i]->clone()); } @@ -90,17 +90,20 @@ public: } /* - * Merge the memory table down into the tree, completing any required other - * merges to make room for it. + * Flush a buffer into the extension structure, performing any necessary + * reconstructions to free up room in L0. + * + * FIXME: arguably, this should be a method attached to the buffer that + * takes a structure as input. */ - inline bool merge_buffer(Buffer *buffer) { - assert(can_merge_with(0, buffer->get_record_count())); + inline bool flush_buffer(Buffer *buffer) { + assert(can_reconstruct_with(0, buffer->get_record_count())); // FIXME: this step makes an extra copy of the buffer, // which could be avoided by adjusting the shard // reconstruction process a bit, possibly. - buffer->start_merge(); - merge_buffer_into_l0(buffer); + buffer->start_flush(); + flush_buffer_into_l0(buffer); return true; } @@ -123,7 +126,7 @@ public: * Return the total number of tombstones contained within all of the * levels of the structure. */ - size_t get_tombstone_cnt() { + size_t get_tombstone_count() { size_t cnt = 0; for (size_t i=0; i get_compaction_tasks() { - std::vector tasks; + std::vector get_compaction_tasks() { + std::vector tasks; /* if the tombstone/delete invariant is satisfied, no need for compactions */ if (validate_tombstone_proportion()) { @@ -220,16 +223,16 @@ public: assert(violation_idx != -1); - level_index merge_base_level = find_mergable_level(violation_idx); - if (merge_base_level == -1) { - merge_base_level = grow(); + level_index base_level = find_reconstruction_target(violation_idx); + if (base_level == -1) { + base_level = grow(); } - for (level_index i=merge_base_level; i>0; i--) { - MergeTask task = {i-1, i}; + for (level_index i=base_level; i>0; i--) { + ReconstructionTask task = {i-1, i}; /* - * The amount of storage required for the merge accounts + * The amount of storage required for the reconstruction accounts * for the cost of storing the new records, along with the * cost of retaining the old records during the process * (hence the 2x multiplier). @@ -240,7 +243,7 @@ public: */ size_t reccnt = m_levels[i-1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_merge_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt)) { reccnt += m_levels[i]->get_record_count(); } } @@ -255,28 +258,27 @@ public: /* * */ - std::vector get_merge_tasks(size_t buffer_reccnt) { - std::vector merges; + std::vector get_reconstruction_tasks(size_t buffer_reccnt) { + std::vector reconstructions; /* - * The buffer -> L0 merge task is not included so if that - * can be done without any other change, just return an - * empty list. + * The buffer flush is not included so if that can be done without any + * other change, just return an empty list. */ - if (can_merge_with(0, buffer_reccnt)) { - return std::move(merges); + if (can_reconstruct_with(0, buffer_reccnt)) { + return std::move(reconstructions); } - level_index merge_base_level = find_mergable_level(0); - if (merge_base_level == -1) { - merge_base_level = grow(); + level_index base_level = find_reconstruction_target(0); + if (base_level == -1) { + base_level = grow(); } - for (level_index i=merge_base_level; i>0; i--) { - MergeTask task = {i-1, i}; + for (level_index i=base_level; i>0; i--) { + ReconstructionTask task = {i-1, i}; /* - * The amount of storage required for the merge accounts + * The amount of storage required for the reconstruction accounts * for the cost of storing the new records, along with the * cost of retaining the old records during the process * (hence the 2x multiplier). @@ -287,34 +289,34 @@ public: */ size_t reccnt = m_levels[i-1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_merge_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt)) { reccnt += m_levels[i]->get_record_count(); } } //task.m_size = 2* reccnt * sizeof(R); - merges.push_back(task); + reconstructions.push_back(task); } - return std::move(merges); + return std::move(reconstructions); } /* * */ - std::vector get_merge_tasks_from_level(level_index source_level) { - std::vector merges; + std::vector get_reconstruction_tasks_from_level(level_index source_level) { + std::vector reconstructions; - level_index merge_base_level = find_mergable_level(source_level); - if (merge_base_level == -1) { - merge_base_level = grow(); + level_index base_level = find_reconstruction_target(source_level); + if (base_level == -1) { + base_level = grow(); } - for (level_index i=merge_base_level; i>source_level; i--) { - MergeTask task = {i - 1, i}; + for (level_index i=base_level; i>source_level; i--) { + ReconstructionTask task = {i - 1, i}; /* - * The amount of storage required for the merge accounts + * The amount of storage required for the reconstruction accounts * for the cost of storing the new records, along with the * cost of retaining the old records during the process * (hence the 2x multiplier). @@ -325,31 +327,30 @@ public: */ size_t reccnt = m_levels[i-1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_merge_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt)) { reccnt += m_levels[i]->get_record_count(); } } // task.m_size = 2* reccnt * sizeof(R); - merges.push_back(task); + reconstructions.push_back(task); } - return merges; + return reconstructions; } /* - * Merge the level specified by incoming level into the level specified - * by base level. The two levels should be sequential--i.e. no levels - * are skipped in the merge process--otherwise the tombstone ordering - * invariant may be violated by the merge operation. + * Combine incoming_level with base_level and reconstruct the shard, + * placing it in base_level. The two levels should be sequential--i.e. no + * levels are skipped in the reconstruction process--otherwise the + * tombstone ordering invariant may be violated. */ - inline void merge_levels(level_index base_level, level_index incoming_level) { - // merging two memory levels + inline void reconstruction(level_index base_level, level_index incoming_level) { if constexpr (L == LayoutPolicy::LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get()); + m_levels[base_level] = InternalLevel::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get()); } else { - m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get()); + m_levels[base_level]->append_level(m_levels[incoming_level].get()); m_levels[base_level]->finalize(); } @@ -391,9 +392,7 @@ private: std::vector>> m_levels; /* - * Add a new level to the LSM Tree and return that level's index. Will - * automatically determine whether the level should be on memory or on disk, - * and act appropriately. + * Add a new level to the structure and return its index. */ inline level_index grow() { level_index new_idx = m_levels.size(); @@ -405,22 +404,18 @@ private: /* * Find the first level below the level indicated by idx that - * is capable of sustaining a merge operation and return its + * is capable of sustaining a reconstruction and return its * level index. If no such level exists, returns -1. Also - * returns -1 if idx==0, and no such level exists, to skimplify - * the logic of the first merge. + * returns -1 if idx==0, and no such level exists, to simplify + * the logic of the first buffer flush. */ - inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { + inline level_index find_reconstruction_target(level_index idx, Buffer *buffer=nullptr) { if (idx == 0 && m_levels.size() == 0) return -1; - bool level_found = false; - bool disk_level; - level_index merge_level_idx; - size_t incoming_rec_cnt = get_level_record_count(idx, buffer); for (level_index i=idx+1; i(0, 1); temp_level->append_buffer(buffer); - auto new_level = InternalLevel::merge_levels(old_level, temp_level); + auto new_level = InternalLevel::reconstruction(old_level, temp_level); m_levels[0] = new_level; delete temp_level; @@ -479,13 +474,10 @@ private: } /* - * Determines if the specific level can merge with another record containing - * incoming_rec_cnt number of records. The provided level index should be - * non-negative (i.e., not refer to the buffer) and will be automatically - * translated into the appropriate index into either the disk or memory level - * vector. + * Determines if a level can sustain a reconstruction with incoming_rec_cnt + * additional records without exceeding its capacity. */ - inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) { + inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) { if (idx >= m_levels.size() || !m_levels[idx]) { return false; } -- cgit v1.2.3 From 24a42e300c96e2815bf20be3f6cce3efee1c4303 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 21 Dec 2023 17:03:39 -0500 Subject: ExtensionStructure: adjusted leveling logic to avoid unneeded copies This also reduces the special-case overhead on shards. As it was, shards would need to handle a special case when constructing from other shards where the first of the two provided shards was a nullptr, which caused a number of subtle issues (or outright crashes in some cases) with existing shard implementations. --- include/framework/structure/ExtensionStructure.h | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 3cd55ac..60016a0 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -347,13 +347,19 @@ public: */ inline void reconstruction(level_index base_level, level_index incoming_level) { if constexpr (L == LayoutPolicy::LEVELING) { - auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get()); + /* if the base level has a shard, merge the base and incoming together to make a new one */ + if (m_levels[base_level]->get_shard_count() > 0) { + m_levels[base_level] = InternalLevel::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get()); + /* otherwise, we can just move the incoming to the base */ + } else { + m_levels[base_level] = m_levels[incoming_level]; + } } else { m_levels[base_level]->append_level(m_levels[incoming_level].get()); m_levels[base_level]->finalize(); } + /* place a new, empty level where the incoming level used to be */ m_levels[incoming_level] = std::shared_ptr>(new InternalLevel(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); } @@ -432,10 +438,13 @@ private: auto old_level = m_levels[0].get(); auto temp_level = new InternalLevel(0, 1); temp_level->append_buffer(buffer); - auto new_level = InternalLevel::reconstruction(old_level, temp_level); - m_levels[0] = new_level; - delete temp_level; + if (old_level->get_shard_count() > 0) { + m_levels[0] = InternalLevel::reconstruction(old_level, temp_level); + delete temp_level; + } else { + m_levels[0] = std::shared_ptr>(temp_level); + } } else { m_levels[0]->append_buffer(buffer); } -- cgit v1.2.3 From aac0bb661af8fae38d3ce08d6078cb4d9dfcb575 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 12 Jan 2024 14:10:11 -0500 Subject: Initial integration of new buffering scheme into framework It isn't working right now (lotsa test failures), but we're to the debugging phase now. --- include/framework/structure/ExtensionStructure.h | 41 +++++++----------------- 1 file changed, 12 insertions(+), 29 deletions(-) (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 60016a0..ae566cb 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -10,28 +10,22 @@ #pragma once #include -#include #include #include -#include "framework/structure/MutableBuffer.h" +#include "framework/structure/BufferView.h" #include "framework/structure/InternalLevel.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" -#include "framework/interface/Record.h" #include "framework/util/Configuration.h" -#include "framework/scheduling/Task.h" #include "psu-util/timer.h" -#include "psu-ds/Alias.h" namespace de { template class ExtensionStructure { typedef S Shard; - typedef MutableBuffer Buffer; + typedef BufferView BuffView; public: ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) @@ -96,14 +90,10 @@ public: * FIXME: arguably, this should be a method attached to the buffer that * takes a structure as input. */ - inline bool flush_buffer(Buffer *buffer) { - assert(can_reconstruct_with(0, buffer->get_record_count())); + inline bool flush_buffer(BuffView buffer) { + assert(can_reconstruct_with(0, buffer.get_record_count())); - // FIXME: this step makes an extra copy of the buffer, - // which could be avoided by adjusting the shard - // reconstruction process a bit, possibly. - buffer->start_flush(); - flush_buffer_into_l0(buffer); + flush_buffer_into_l0(std::move(buffer)); return true; } @@ -415,11 +405,11 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first buffer flush. */ - inline level_index find_reconstruction_target(level_index idx, Buffer *buffer=nullptr) { + inline level_index find_reconstruction_target(level_index idx) { if (idx == 0 && m_levels.size() == 0) return -1; - size_t incoming_rec_cnt = get_level_record_count(idx, buffer); + size_t incoming_rec_cnt = get_level_record_count(idx); for (level_index i=idx+1; i(0, 1); - temp_level->append_buffer(buffer); + temp_level->append_buffer(std::move(buffer)); if (old_level->get_shard_count() > 0) { m_levels[0] = InternalLevel::reconstruction(old_level, temp_level); @@ -446,7 +436,7 @@ private: m_levels[0] = std::shared_ptr>(temp_level); } } else { - m_levels[0]->append_buffer(buffer); + m_levels[0]->append_buffer(std::move(buffer)); } } @@ -469,16 +459,9 @@ private: } /* - * Returns the actual number of records present on a specified level. An - * index value of -1 indicates the memory table. Can optionally pass in - * a pointer to the memory table to use, if desired. Otherwise, there are - * no guarantees about which buffer will be accessed if level_index is -1. + * Returns the number of records present on a specified level. */ - inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) { - if (buffer) { - return buffer->get_record_count(); - } - + inline size_t get_level_record_count(level_index idx) { return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; } -- cgit v1.2.3 From 10b4425e842d10b7fbfa85978969ed4591d6b98e Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 7 Feb 2024 10:56:52 -0500 Subject: Fully implemented Query concept and adjusted queries to use it --- include/framework/structure/ExtensionStructure.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index ae566cb..0b8263e 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -22,7 +22,7 @@ namespace de { -template +template Q, LayoutPolicy L=LayoutPolicy::TEIRING> class ExtensionStructure { typedef S Shard; typedef BufferView BuffView; -- cgit v1.2.3 From 2c5d549b3618b9ea72e6eece4cb4f3da5a6811a8 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 7 Feb 2024 13:42:34 -0500 Subject: Fully realized shard concept interface --- include/framework/structure/ExtensionStructure.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 0b8263e..373a1e2 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -22,7 +22,7 @@ namespace de { -template Q, LayoutPolicy L=LayoutPolicy::TEIRING> +template S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING> class ExtensionStructure { typedef S Shard; typedef BufferView BuffView; -- cgit v1.2.3 From 402fc269c0aaa671d84a6d15918735ad4b90e6b2 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 9 Feb 2024 12:30:21 -0500 Subject: Comment updates/fixes --- include/framework/structure/ExtensionStructure.h | 64 +++++++++++++----------- 1 file changed, 35 insertions(+), 29 deletions(-) (limited to 'include/framework/structure/ExtensionStructure.h') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 373a1e2..4802bc1 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -37,19 +37,23 @@ public: ~ExtensionStructure() = default; /* - * Create a shallow copy of this extension structure. The copy will share references to the - * same levels/shards as the original, but will have its own lists. As all of the shards are - * immutable (with the exception of deletes), the copy can be restructured with reconstructions - * and flushes without affecting the original. The copied structure will be returned with a reference - * count of 0; generally you will want to immediately call take_reference() on it. + * Create a shallow copy of this extension structure. The copy will share + * references to the same levels/shards as the original, but will have its + * own lists. As all of the shards are immutable (with the exception of + * deletes), the copy can be restructured with reconstructions and flushes + * without affecting the original. The copied structure will be returned + * with a reference count of 0; generally you will want to immediately call + * take_reference() on it. * - * NOTE: When using tagged deletes, a delete of a record in the original structure will affect - * the copy, so long as the copy retains a reference to the same shard as the original. This could - * cause synchronization problems under tagging with concurrency. Any deletes in this context will + * NOTE: When using tagged deletes, a delete of a record in the original + * structure will affect the copy, so long as the copy retains a reference + * to the same shard as the original. This could cause synchronization + * problems under tagging with concurrency. Any deletes in this context will * need to be forwarded to the appropriate structures manually. */ ExtensionStructure *copy() { - auto new_struct = new ExtensionStructure(m_buffer_size, m_scale_factor, m_max_delete_prop); + auto new_struct = new ExtensionStructure(m_buffer_size, m_scale_factor, + m_max_delete_prop); for (size_t i=0; im_levels.push_back(m_levels[i]->clone()); } @@ -64,9 +68,9 @@ public: * setting the delete bit in its wrapped header. Returns 1 if a matching * record was found and deleted, and 0 if a matching record was not found. * - * This function will stop after finding the first matching record. It is assumed - * that no duplicate records exist. In the case of duplicates, this function will - * still "work", but in the sense of "delete first match". + * This function will stop after finding the first matching record. It is + * assumed that no duplicate records exist. In the case of duplicates, this + * function will still "work", but in the sense of "delete first match". */ int tagged_delete(const R &rec) { for (auto level : m_levels) { @@ -77,7 +81,7 @@ public: /* * If the record to be erased wasn't found, return 0. The - * DynamicExtension itself will then search the active + * DynamicExtension itself will then search the active * Buffers. */ return 0; @@ -164,21 +168,23 @@ public: } /* - * Validate that no level in the structure exceeds its maximum tombstone capacity. This is - * used to trigger preemptive compactions at the end of the reconstruction process. + * Validate that no level in the structure exceeds its maximum tombstone + * capacity. This is used to trigger preemptive compactions at the end of + * the reconstruction process. */ bool validate_tombstone_proportion() { - long double ts_prop; - for (size_t i=0; iget_tombstone_count() / (long double) calc_level_record_capacity(i); - if (ts_prop > (long double) m_max_delete_prop) { - return false; - } - } + long double ts_prop; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + ts_prop = (long double)m_levels[i]->get_tombstone_count() / + (long double)calc_level_record_capacity(i); + if (ts_prop > (long double)m_max_delete_prop) { + return false; + } } + } - return true; + return true; } bool validate_tombstone_proportion(level_index level) { @@ -224,14 +230,14 @@ public: /* * The amount of storage required for the reconstruction accounts * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). + * cost of retaining the old records during the process + * (hence the 2x multiplier). * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records * themselves. */ - size_t reccnt = m_levels[i-1]->get_record_count(); + size_t reccnt = m_levels[i - 1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { if (can_reconstruct_with(i, reccnt)) { reccnt += m_levels[i]->get_record_count(); -- cgit v1.2.3