From eb8dbaa770a57557d67c817c2839c64f536a6ce4 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 13 Sep 2023 16:22:03 -0400 Subject: Began re-architecting the project for concurrency support The project is now in a state where it builds, but it probably has a lot of bugs still. --- include/framework/DynamicExtension.h | 372 +++++++---------------------------- 1 file changed, 70 insertions(+), 302 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 524024b..5e9bcee 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -19,62 +19,36 @@ #include "framework/ShardInterface.h" #include "framework/QueryInterface.h" #include "framework/RecordInterface.h" +#include "framework/ExtensionStructure.h" + +#include "framework/Configuration.h" +#include "framework/Scheduler.h" -#include "shard/WIRS.h" #include "psu-util/timer.h" #include "psu-ds/Alias.h" namespace de { -thread_local size_t sampling_attempts = 0; -thread_local size_t sampling_rejections = 0; -thread_local size_t deletion_rejections = 0; -thread_local size_t bounds_rejections = 0; -thread_local size_t tombstone_rejections = 0; -thread_local size_t buffer_rejections = 0; - -/* - * thread_local size_t various_sampling_times go here. - */ -thread_local size_t sample_range_time = 0; -thread_local size_t alias_time = 0; -thread_local size_t alias_query_time = 0; -thread_local size_t rejection_check_time = 0; -thread_local size_t buffer_sample_time = 0; -thread_local size_t memlevel_sample_time = 0; -thread_local size_t disklevel_sample_time = 0; -thread_local size_t sampling_bailouts = 0; - - -enum class LayoutPolicy { - LEVELING, - TEIRING -}; - -enum class DeletePolicy { - TOMBSTONE, - TAGGING -}; - -typedef ssize_t level_index; - template class DynamicExtension { - //typedef typename S Shard; typedef S Shard; typedef MutableBuffer Buffer; - + typedef ExtensionStructure Structure; public: - DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) - : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop), - m_buffer(new Buffer(buffer_cap, buffer_cap * max_delete_prop)) + DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop, size_t memory_budget=0, + size_t thread_cnt=16) + : m_scale_factor(scale_factor) + , m_max_delete_prop(max_delete_prop) + , m_sched(memory_budget, thread_cnt) { } ~DynamicExtension() { - delete m_buffer; + for (size_t i=0; idelete_record(rec)) { - return 1; - } + if (get_active_version()->tagged_delete(rec)) { + return 1; } - // the buffer will take the longest amount of time, and - // probably has the lowest probability of having the record, - // so we'll check it last. + /* + * the buffer will take the longest amount of time, and + * probably has the lowest probability of having the record, + * so we'll check it last. + */ return buffer->delete_record(rec); } + /* + * If tagging isn't used, then delete using a tombstone + */ return internal_append(rec, true); } std::vector query(void *parms) { auto buffer = get_buffer(); + auto vers = get_active_version(); // Get the buffer query state auto buffer_state = Q::get_buffer_query_state(buffer, parms); @@ -115,7 +89,7 @@ public: std::vector> shards; std::vector states; - for (auto &level : m_levels) { + for (auto &level : vers->get_levels()) { level->get_query_states(shards, states, parms); } @@ -125,7 +99,7 @@ public: // Execute the query for the buffer auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); - query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer)); + query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers)); if constexpr (Q::EARLY_ABORT) { if (query_results[0].size() > 0) { auto result = Q::merge(query_results, parms); @@ -141,7 +115,7 @@ public: // Execute the query for each shard for (size_t i=0; i 0) { auto result = Q::merge(query_results, parms); @@ -170,75 +144,44 @@ public: size_t get_record_count() { size_t cnt = get_buffer()->get_record_count(); - - for (size_t i=0; iget_record_count(); - } - - return cnt; + return cnt + get_active_version()->get_record_count(); } size_t get_tombstone_cnt() { size_t cnt = get_buffer()->get_tombstone_count(); - - for (size_t i=0; iget_tombstone_count(); - } - - return cnt; + return cnt + get_active_version()->get_tombstone_cnt(); } size_t get_height() { - return m_levels.size(); + return get_active_version()->get_height(); } size_t get_memory_usage() { - size_t cnt = m_buffer->get_memory_usage(); - - for (size_t i=0; iget_memory_usage(); - } + auto vers = get_active_version(); + auto buffer = get_buffer(); - return cnt; + return vers.get_memory_usage() + buffer->get_memory_usage(); } size_t get_aux_memory_usage() { - size_t cnt = m_buffer->get_aux_memory_usage(); - - for (size_t i=0; iget_aux_memory_usage(); - } - } - - return cnt; - } - - bool validate_tombstone_proportion() { - long double ts_prop; - for (size_t i=0; iget_tombstone_count() / (long double) calc_level_record_capacity(i); - if (ts_prop > (long double) m_max_delete_prop) { - return false; - } - } - } + auto vers = get_active_version(); + auto buffer = get_buffer(); - return true; + return vers.get_aux_memory_usage() + buffer->get_aux_memory_usage(); } size_t get_buffer_capacity() { - return m_buffer->get_capacity(); + return get_height()->get_capacity(); } Shard *create_static_structure() { + auto vers = get_active_version(); std::vector shards; - if (m_levels.size() > 0) { - for (int i=m_levels.size() - 1; i>= 0; i--) { - if (m_levels[i]) { - shards.emplace_back(m_levels[i]->get_merged_shard()); + if (vers->get_levels().size() > 0) { + for (int i=vers->get_levels().size() - 1; i>= 0; i--) { + if (vers->get_levels()[i]) { + shards.emplace_back(vers->get_levels()[i]->get_merged_shard()); } } } @@ -263,16 +206,32 @@ public: return flattened; } + /* + * Mostly exposed for unit-testing purposes. Verifies that the current + * active version of the ExtensionStructure doesn't violate the maximum + * tombstone proportion invariant. + */ + bool validate_tombstone_proportion() { + return get_active_version()->validate_tombstone_proportion(); + } + private: - Buffer *m_buffer; + Scheduler m_sched; + + std::vector m_buffers; + std::vector m_versions; + + std::atomic m_current_epoch; size_t m_scale_factor; double m_max_delete_prop; - std::vector *> m_levels; - Buffer *get_buffer() { - return m_buffer; + return m_buffers[0]; + } + + Structure *get_active_version() { + return m_versions[0]; } int internal_append(const R &rec, bool ts) { @@ -281,13 +240,14 @@ private: ; if (buffer->is_full()) { - merge_buffer(); + auto vers = get_active_version(); + m_sched.schedule_merge(vers, buffer); } return buffer->append(rec, ts); } - std::vector> filter_deletes(std::vector> &records, ShardID shid, Buffer *buffer) { + std::vector> filter_deletes(std::vector> &records, ShardID shid, Buffer *buffer, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -322,12 +282,12 @@ private: if (shid != INVALID_SHID) { for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { - if (m_levels[lvl]->check_tombstone(0, rec.rec)) { + if (vers->get_levels()[lvl]->check_tombstone(0, rec.rec)) { continue; } } - if (m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) { + if (vers->get_levels()[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) { continue; } } @@ -337,198 +297,6 @@ private: return processed_records; } - - /* - * Add a new level to the LSM Tree and return that level's index. Will - * automatically determine whether the level should be on memory or on disk, - * and act appropriately. - */ - inline level_index grow() { - level_index new_idx; - - size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - new_idx = m_levels.size(); - if (new_idx > 0) { - assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); - } - m_levels.emplace_back(new InternalLevel(new_idx, new_shard_cnt)); - - return new_idx; - } - - - // Merge the memory table down into the tree, completing any required other - // merges to make room for it. - inline void merge_buffer() { - auto buffer = get_buffer(); - - if (!can_merge_with(0, buffer->get_record_count())) { - merge_down(0); - } - - merge_buffer_into_l0(buffer); - enforce_delete_maximum(0); - - buffer->truncate(); - return; - } - - /* - * Merge the specified level down into the tree. The level index must be - * non-negative (i.e., this function cannot be used to merge the buffer). This - * routine will recursively perform any necessary merges to make room for the - * specified level. - */ - inline void merge_down(level_index idx) { - level_index merge_base_level = find_mergable_level(idx); - if (merge_base_level == -1) { - merge_base_level = grow(); - } - - for (level_index i=merge_base_level; i>idx; i--) { - merge_levels(i, i-1); - enforce_delete_maximum(i); - } - - return; - } - - /* - * Find the first level below the level indicated by idx that - * is capable of sustaining a merge operation and return its - * level index. If no such level exists, returns -1. Also - * returns -1 if idx==0, and no such level exists, to simplify - * the logic of the first merge. - */ - inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { - - if (idx == 0 && m_levels.size() == 0) return -1; - - bool level_found = false; - bool disk_level; - level_index merge_level_idx; - - size_t incoming_rec_cnt = get_level_record_count(idx, buffer); - for (level_index i=idx+1; i::merge_levels(m_levels[base_level], m_levels[incoming_level]); - mark_as_unused(tmp); - } else { - m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); - } - - mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor); - } - - - inline void merge_buffer_into_l0(Buffer *buffer) { - assert(m_levels[0]); - if constexpr (L == LayoutPolicy::LEVELING) { - // FIXME: Kludgey implementation due to interface constraints. - auto old_level = m_levels[0]; - auto temp_level = new InternalLevel(0, 1); - temp_level->append_buffer(buffer); - auto new_level = InternalLevel::merge_levels(old_level, temp_level); - - m_levels[0] = new_level; - delete temp_level; - mark_as_unused(old_level); - } else { - m_levels[0]->append_buffer(buffer); - } - } - - /* - * Mark a given memory level as no-longer in use by the tree. For now this - * will just free the level. In future, this will be more complex as the - * level may not be able to immediately be deleted, depending upon who - * else is using it. - */ - inline void mark_as_unused(InternalLevel *level) { - delete level; - } - - /* - * Check the tombstone proportion for the specified level and - * if the limit is exceeded, forcibly merge levels until all - * levels below idx are below the limit. - */ - inline void enforce_delete_maximum(level_index idx) { - long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx); - - if (ts_prop > (long double) m_max_delete_prop) { - merge_down(idx); - } - - return; - } - - /* - * Assume that level "0" should be larger than the buffer. The buffer - * itself is index -1, which should return simply the buffer capacity. - */ - inline size_t calc_level_record_capacity(level_index idx) { - return get_buffer()->get_capacity() * pow(m_scale_factor, idx+1); - } - - /* - * Returns the actual number of records present on a specified level. An - * index value of -1 indicates the memory table. Can optionally pass in - * a pointer to the memory table to use, if desired. Otherwise, there are - * no guarantees about which buffer will be accessed if level_index is -1. - */ - inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) { - - assert(idx >= -1); - if (idx == -1) { - return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count(); - } - - return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; - } - - /* - * Determines if the specific level can merge with another record containing - * incoming_rec_cnt number of records. The provided level index should be - * non-negative (i.e., not refer to the buffer) and will be automatically - * translated into the appropriate index into either the disk or memory level - * vector. - */ - inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) { - if (idx>= m_levels.size() || !m_levels[idx]) { - return false; - } - - if (L == LayoutPolicy::LEVELING) { - return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); - } else { - return m_levels[idx]->get_shard_count() < m_scale_factor; - } - - // unreachable - assert(true); - } }; - } -- cgit v1.2.3 From 7f56949bc847b56da69c9eb3ebe081d6cf9f61c6 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 18 Sep 2023 12:25:01 -0400 Subject: General bugfixes --- include/framework/DynamicExtension.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 5e9bcee..08e2243 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -40,7 +40,10 @@ public: : m_scale_factor(scale_factor) , m_max_delete_prop(max_delete_prop) , m_sched(memory_budget, thread_cnt) - { } + { + m_buffers.push_back(new Buffer(buffer_cap, max_delete_prop*buffer_cap)); + m_versions.push_back(new Structure(buffer_cap, scale_factor, max_delete_prop)); + } ~DynamicExtension() { for (size_t i=0; i Date: Mon, 18 Sep 2023 16:37:30 -0400 Subject: The scheduler now spawns a seperate merge thread Merges are now executed from a seperate thread within the scheduler that wakes up via condition variables when new merge tasks are scheduled. In addition, tombstone limits are now enforced by the scheduler, with new merges being scheduled as needed. There are still a few tests failing, notably the zero tombstones in the last run invarient is not holding under tiering with tombstones. Need to look into that yet. --- include/framework/DynamicExtension.h | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 08e2243..6965965 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -301,5 +301,10 @@ private: return processed_records; } }; + +template +static void de_merge_callback(DynamicExtension extension, ExtensionStructure new_version) { + +} } -- cgit v1.2.3 From 7c03d771475421c1d5a2bbc135242536af1a371c Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 25 Sep 2023 10:49:36 -0400 Subject: Re-structuring Project + scheduling updates This is a big one--probably should have split it apart, but I'm feeling lazy this morning. * Organized the mess of header files in include/framework by splitting them out into their own subdirectories, and renaming a few files to remove redundancies introduced by the directory structure. * Introduced a new framework/ShardRequirements.h header file for simpler shard development. This header simply contains the necessary includes from framework/* for creating shard files. This should help to remove structural dependencies from the framework file structure and shards, as well as centralizing the necessary framework files to make shard development easier. * Created a (currently dummy) SchedulerInterface, and make the scheduler implementation a template parameter of the dynamic extension for easier testing of various scheduling policies. There's still more work to be done to fully integrate the scheduler (queries, multiple buffers), but some more of the necessary framework code for this has been added as well. * Adjusted the Task interface setup for the scheduler. The task structures have been removed from ExtensionStructure and placed in their own header file. Additionally, I started experimenting with using std::variant, as opposed to inheritence, to implement subtype polymorphism on the Merge and Query tasks. The scheduler now has a general task queue that contains both, and std::variant, std::visit, and std::get are used to manipulate them without virtual functions. * Removed Alex.h, as it can't build anyway. There's a branch out there containing the Alex implementation stripped of the C++20 stuff. So there's no need to keep it here. --- include/framework/DynamicExtension.h | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 6965965..3a460aa 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -14,22 +14,24 @@ #include #include -#include "framework/MutableBuffer.h" -#include "framework/InternalLevel.h" -#include "framework/ShardInterface.h" -#include "framework/QueryInterface.h" -#include "framework/RecordInterface.h" -#include "framework/ExtensionStructure.h" - -#include "framework/Configuration.h" -#include "framework/Scheduler.h" +#include "framework/structure/MutableBuffer.h" +#include "framework/structure/InternalLevel.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" +#include "framework/interface/Record.h" +#include "framework/interface/Query.h" +#include "framework/interface/Scheduler.h" +#include "framework/structure/ExtensionStructure.h" + +#include "framework/util/Configuration.h" +#include "framework/scheduling/SerialScheduler.h" #include "psu-util/timer.h" #include "psu-ds/Alias.h" namespace de { -template +template > class DynamicExtension { typedef S Shard; typedef MutableBuffer Buffer; @@ -219,7 +221,7 @@ public: } private: - Scheduler m_sched; + SCHED m_sched; std::vector m_buffers; std::vector m_versions; -- cgit v1.2.3 From 7ecfb22c32b7986ed1a2439c1abbeed298e4153a Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 20 Oct 2023 17:00:42 -0400 Subject: Initial pass w/ new scheduler setup currently there's a race condition of some type to sort out. --- include/framework/DynamicExtension.h | 184 ++++++++++++++++++++++------------- 1 file changed, 114 insertions(+), 70 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 3a460aa..fc7922c 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -31,7 +31,8 @@ namespace de { -template > +template class DynamicExtension { typedef S Shard; typedef MutableBuffer Buffer; @@ -83,68 +84,8 @@ public: return internal_append(rec, true); } - std::vector query(void *parms) { - auto buffer = get_buffer(); - auto vers = get_active_version(); - - // Get the buffer query state - auto buffer_state = Q::get_buffer_query_state(buffer, parms); - - // Get the shard query states - std::vector> shards; - std::vector states; - - for (auto &level : vers->get_levels()) { - level->get_query_states(shards, states, parms); - } - - Q::process_query_states(parms, states, buffer_state); - - std::vector>> query_results(shards.size() + 1); - - // Execute the query for the buffer - auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); - query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers)); - if constexpr (Q::EARLY_ABORT) { - if (query_results[0].size() > 0) { - auto result = Q::merge(query_results, parms); - for (size_t i=0; i 0) { - auto result = Q::merge(query_results, parms); - for (size_t i=0; i> query(void *parms) { + return schedule_query(get_active_version(), get_buffer(), parms); } size_t get_record_count() { @@ -239,6 +180,112 @@ private: return m_versions[0]; } + static void merge(void *arguments) { + MergeArgs *args = (MergeArgs *) arguments; + + Structure *vers = (Structure *) args->version; + Buffer *buff = (Buffer *) args->buffer; + + for (ssize_t i=args->merges.size() - 1; i>=0; i--) { + vers->merge_levels(args->merges[i].second, args->merges[i].first); + } + + vers->merge_buffer(buff); + + args->result.set_value(true); + delete args; + } + + static void async_query(void *arguments) { + QueryArgs *args = (QueryArgs *) arguments; + + auto buffer = (Buffer *) args->buffer; + auto vers = (Structure *) args->version; + void *parms = args->query_parms; + + // Get the buffer query state + auto buffer_state = Q::get_buffer_query_state(buffer, parms); + + // Get the shard query states + std::vector> shards; + std::vector states; + + for (auto &level : vers->get_levels()) { + level->get_query_states(shards, states, parms); + } + + Q::process_query_states(parms, states, buffer_state); + + std::vector>> query_results(shards.size() + 1); + + // Execute the query for the buffer + auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); + query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers)); + if constexpr (Q::EARLY_ABORT) { + if (query_results[0].size() > 0) { + auto result = Q::merge(query_results, parms); + for (size_t i=0; i 0) { + auto result = Q::merge(query_results, parms); + for (size_t i=0; iresult_set.set_value(std::move(result)); + delete args; + } + + std::future schedule_merge(Structure *version, Buffer *buffer) { + MergeArgs *args = new MergeArgs(); + args->merges = version->get_merge_tasks(buffer->get_record_count()); + args->buffer = buffer; + args->version = version; + + m_sched.schedule_job(merge, 0, args); + + return args->result.get_future(); + } + + std::future> schedule_query(Structure *version, Buffer *buffer, void *query_parms) { + QueryArgs *args = new QueryArgs(); + args->buffer = buffer; + args->version = version; + args->buffer = query_parms; + + m_sched.schedule_job(async_query, 0, args); + + return args->result_set.get_future(); + } + int internal_append(const R &rec, bool ts) { Buffer *buffer; while (!(buffer = get_buffer())) @@ -246,13 +293,15 @@ private: if (buffer->is_full()) { auto vers = get_active_version(); - m_sched.schedule_merge(vers, buffer); + auto res = schedule_merge(vers, buffer); + res.get(); } + return buffer->append(rec, ts); } - std::vector> filter_deletes(std::vector> &records, ShardID shid, Buffer *buffer, Structure *vers) { + static std::vector> filter_deletes(std::vector> &records, ShardID shid, Buffer *buffer, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -303,10 +352,5 @@ private: return processed_records; } }; - -template -static void de_merge_callback(DynamicExtension extension, ExtensionStructure new_version) { - -} } -- cgit v1.2.3 From b72103cb11347f0dd108bd2321f29b0d6ab05106 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 23 Oct 2023 13:18:30 -0400 Subject: Bugfixes --- include/framework/DynamicExtension.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index fc7922c..26221d8 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -259,6 +259,7 @@ private: } Q::delete_buffer_query_state(buffer_state); + buffer->release_reference(); args->result_set.set_value(std::move(result)); delete args; @@ -276,10 +277,13 @@ private: } std::future> schedule_query(Structure *version, Buffer *buffer, void *query_parms) { + buffer->take_reference(); // FIXME: this is wrong. The buffer and version need to be + // taken atomically, together. + QueryArgs *args = new QueryArgs(); args->buffer = buffer; args->version = version; - args->buffer = query_parms; + args->query_parms = query_parms; m_sched.schedule_job(async_query, 0, args); -- cgit v1.2.3 From 3afacb7702e6d8fa67749a2a41dc776d315e02a9 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 23 Oct 2023 17:43:22 -0400 Subject: Began moving to an explicit epoch-based system I started moving over to an explicit Epoch based system, which has necessitated a ton of changes throughout the code base. This will ultimately allow for a much cleaner set of abstractions for managing concurrency. --- include/framework/DynamicExtension.h | 279 +++++++++++++++++++++++------------ 1 file changed, 188 insertions(+), 91 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 26221d8..6936247 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -13,6 +13,7 @@ #include #include #include +#include #include "framework/structure/MutableBuffer.h" #include "framework/structure/InternalLevel.h" @@ -24,7 +25,8 @@ #include "framework/structure/ExtensionStructure.h" #include "framework/util/Configuration.h" -#include "framework/scheduling/SerialScheduler.h" +#include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/Epoch.h" #include "psu-util/timer.h" #include "psu-ds/Alias.h" @@ -32,20 +34,30 @@ namespace de { template + DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=FIFOScheduler> class DynamicExtension { typedef S Shard; typedef MutableBuffer Buffer; typedef ExtensionStructure Structure; + typedef Epoch Epoch; + typedef BufferView BufView; + public: DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop, size_t memory_budget=0, size_t thread_cnt=16) : m_scale_factor(scale_factor) , m_max_delete_prop(max_delete_prop) , m_sched(memory_budget, thread_cnt) + , m_buffer_capacity(buffer_cap) + , m_buffer_delete_capacity(max_delete_prop*buffer_cap) { + auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); + auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop); + auto epoch = new Epoch(vers, buf); + m_buffers.push_back(new Buffer(buffer_cap, max_delete_prop*buffer_cap)); m_versions.push_back(new Structure(buffer_cap, scale_factor, max_delete_prop)); + m_epochs.push_back({0, epoch}); } ~DynamicExtension() { @@ -63,10 +75,10 @@ public: } int erase(const R &rec) { - Buffer *buffer = get_buffer(); - if constexpr (D == DeletePolicy::TAGGING) { - if (get_active_version()->tagged_delete(rec)) { + BufView buffers = get_active_epoch()->get_buffer_view(); + + if (get_active_epoch()->get_structure()->tagged_delete(rec)) { return 1; } @@ -75,7 +87,7 @@ public: * probably has the lowest probability of having the record, * so we'll check it last. */ - return buffer->delete_record(rec); + return buffers->delete_record(rec); } /* @@ -85,43 +97,43 @@ public: } std::future> query(void *parms) { - return schedule_query(get_active_version(), get_buffer(), parms); + return schedule_query(get_active_epoch()->get_structure(), get_active_epoch()->get_buffers()[0], parms); } size_t get_record_count() { - size_t cnt = get_buffer()->get_record_count(); - return cnt + get_active_version()->get_record_count(); + size_t cnt = get_active_epoch()->get_buffer_view().get_record_count(); + return cnt + get_active_epoch()->get_structure()->get_record_count(); } size_t get_tombstone_cnt() { - size_t cnt = get_buffer()->get_tombstone_count(); - return cnt + get_active_version()->get_tombstone_cnt(); + size_t cnt = get_active_epoch()->get_buffer_view().get_tombstone_count(); + return cnt + get_active_epoch()->get_structure()->get_tombstone_cnt(); } size_t get_height() { - return get_active_version()->get_height(); + return get_active_epoch()->get_structure()->get_height(); } size_t get_memory_usage() { - auto vers = get_active_version(); - auto buffer = get_buffer(); + auto vers = get_active_epoch()->get_structure()->get_memory_usage(); + auto buffer = get_active_epoch()->get_buffer_view().get_memory_usage(); - return vers.get_memory_usage() + buffer->get_memory_usage(); + return vers + buffer; } size_t get_aux_memory_usage() { - auto vers = get_active_version(); - auto buffer = get_buffer(); + auto vers = get_active_epoch()->get_structure()->get_aux_memory_usage(); + auto buffer = get_active_epoch()->get_buffer_view().get_aux_memory_usage(); - return vers.get_aux_memory_usage() + buffer->get_aux_memory_usage(); + return vers + buffer; } size_t get_buffer_capacity() { - return get_height()->get_capacity(); + return m_buffer_capacity; } Shard *create_static_structure() { - auto vers = get_active_version(); + auto vers = get_active_epoch()->get_structure(); std::vector shards; if (vers->get_levels().size() > 0) { @@ -132,7 +144,9 @@ public: } } - shards.emplace_back(new S(get_buffer())); + // FIXME: should use a buffer view--or perhaps some sort of a + // raw record iterator model. + shards.emplace_back(new S(get_active_epoch()->get_buffers()[0])); Shard *shards_array[shards.size()]; @@ -158,33 +172,121 @@ public: * tombstone proportion invariant. */ bool validate_tombstone_proportion() { - return get_active_version()->validate_tombstone_proportion(); + return get_active_epoch()->get_structure()->validate_tombstone_proportion(); } private: SCHED m_sched; - std::vector m_buffers; - std::vector m_versions; + std::mutex m_struct_lock; + std::set m_buffers; + std::set m_versions; std::atomic m_current_epoch; + std::unordered_map m_epochs; size_t m_scale_factor; double m_max_delete_prop; + size_t m_buffer_capacity; + size_t m_buffer_delete_capacity; - Buffer *get_buffer() { - return m_buffers[0]; + Epoch *get_active_epoch() { + return m_epochs[m_current_epoch.load()]; + } + + void advance_epoch() { + size_t new_epoch_num = m_current_epoch.load() + 1; + Epoch *new_epoch = m_epochs[new_epoch_num]; + Epoch *old_epoch = m_epochs[m_current_epoch.load()]; + + // Update the new Epoch to contain the buffers + // from the old one that it doesn't currently have + size_t old_buffer_cnt = new_epoch->clear_buffers(); + for (size_t i=old_buffer_cnt; iget_buffers().size(); i++) { + new_epoch->add_buffer(old_epoch->get_buffers[i]); + } + m_current_epoch.fetch_add(1); } - Structure *get_active_version() { - return m_versions[0]; + /* + * Creates a new epoch by copying the currently active one. The new epoch's + * structure will be a shallow copy of the old one's. + */ + Epoch *create_new_epoch() { + auto new_epoch = get_active_epoch()->clone(); + std::unique_lock m_struct_lock; + m_versions.insert(new_epoch->get_structure()); + m_epochs.insert({m_current_epoch.load() + 1, new_epoch}); + m_struct_lock.release(); + + return new_epoch; + } + + /* + * Add a new empty buffer to the specified epoch. This is intended to be used + * when a merge is triggered, to allow for inserts to be sustained in the new + * buffer while a new epoch is being created in the background. Returns a + * pointer to the newly created buffer. + */ + Buffer *add_empty_buffer(Epoch *epoch) { + auto new_buffer = Buffer(m_buffer_capacity, m_buffer_delete_capacity); + + std::unique_lock m_struct_lock; + m_buffers.insert(new_buffer); + m_struct_lock.release(); + + epoch->add_buffer(new_buffer); + return new_buffer; + } + + void retire_epoch(Epoch *epoch) { + /* + * Epochs with currently active jobs cannot + * be retired. By the time retire_epoch is called, + * it is assumed that a new epoch is active, meaning + * that the epoch to be retired should no longer + * accumulate new active jobs. Eventually, this + * number will hit zero and the function will + * proceed. + * + * FIXME: this can be replaced with a cv, which + * is probably a superior solution in this case + */ + while (epoch->get_active_job_num() > 0) + ; + + /* + * The epoch's destructor will handle releasing + * all the references it holds + */ + delete epoch; + + /* + * Following the epoch's destruction, any buffers + * or structures with no remaining references can + * be safely freed. + */ + std::unique_lock lock(m_struct_lock); + for (auto buf : m_buffers) { + if (buf->get_reference_count() == 0) { + m_buffers.erase(buf); + delete buf; + } + } + + for (auto vers : m_versions) { + if (vers->get_reference_count() == 0) { + m_versions.erase(vers); + delete vers; + } + } } static void merge(void *arguments) { - MergeArgs *args = (MergeArgs *) arguments; + MergeArgs *args = (MergeArgs *) arguments; - Structure *vers = (Structure *) args->version; - Buffer *buff = (Buffer *) args->buffer; + Structure *vers = args->epoch->get_structure(); + Buffer *buff = (Buffer *) args->epoch->get_buffers()[0]; for (ssize_t i=args->merges.size() - 1; i>=0; i--) { vers->merge_levels(args->merges[i].second, args->merges[i].first); @@ -193,98 +295,94 @@ private: vers->merge_buffer(buff); args->result.set_value(true); + args->epoch->end_job(); delete args; } + static std::vector finalize_query_result(std::vector>> &query_results, void *parms, + std::vector &shard_states, std::vector &buffer_states) { + auto result = Q::merge(query_results, parms); + + for (size_t i=0; i *args = (QueryArgs *) arguments; + QueryArgs *args = (QueryArgs *) arguments; - auto buffer = (Buffer *) args->buffer; - auto vers = (Structure *) args->version; + auto buffers = args->epoch->get_buffer_view(); + auto vers = args->epoch->get_structure(); void *parms = args->query_parms; - // Get the buffer query state - auto buffer_state = Q::get_buffer_query_state(buffer, parms); + // Get the buffer query states + std::vector buffer_states = buffers->get_buffer_query_states(parms); // Get the shard query states std::vector> shards; - std::vector states; - - for (auto &level : vers->get_levels()) { - level->get_query_states(shards, states, parms); - } + std::vector shard_states = vers->get_query_states(shards, parms); - Q::process_query_states(parms, states, buffer_state); + Q::process_query_states(parms, shard_states, buffer_states); - std::vector>> query_results(shards.size() + 1); + std::vector>> query_results(shards.size() + buffer_states.size()); // Execute the query for the buffer - auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); - query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers)); - if constexpr (Q::EARLY_ABORT) { - if (query_results[0].size() > 0) { - auto result = Q::merge(query_results, parms); - for (size_t i=0; i>> buffer_results(buffer_states.size()); + for (size_t i=0; iget_buffers[i], buffer_states[i], parms); + query_results[i] = std::move(filter_deletes(buffer_results, {-1, -1}, buffers, vers)); - Q::delete_buffer_query_state(buffer_state); - return result; + if constexpr (Q::EARLY_ABORT) { + if (query_results[i] > 0) { + return finalize_query_result(query_results, parms, buffer_states, shard_states); + } } } - + // Execute the query for each shard for (size_t i=0; i 0) { - auto result = Q::merge(query_results, parms); - for (size_t i=0; i 0) { + return finalize_query_result(query_results, parms, buffer_states, shard_states); } } } - // Merge the results together - auto result = Q::merge(query_results, parms); - - for (size_t i=0; irelease_reference(); - + // Merge the results together and finalize the job + auto result = finalize_query_result(query_results, parms, buffer_states, shard_states); args->result_set.set_value(std::move(result)); + + args->epoch->end_job(); delete args; } - std::future schedule_merge(Structure *version, Buffer *buffer) { - MergeArgs *args = new MergeArgs(); - args->merges = version->get_merge_tasks(buffer->get_record_count()); - args->buffer = buffer; - args->version = version; + std::future schedule_merge() { + auto epoch = get_active_epoch(); + epoch->start_job(); + MergeArgs *args = new MergeArgs(); + args->epoch = epoch; + args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]); m_sched.schedule_job(merge, 0, args); return args->result.get_future(); } - std::future> schedule_query(Structure *version, Buffer *buffer, void *query_parms) { - buffer->take_reference(); // FIXME: this is wrong. The buffer and version need to be - // taken atomically, together. + std::future> schedule_query(void *query_parms) { + auto epoch = get_active_epoch(); + epoch->start_job(); - QueryArgs *args = new QueryArgs(); - args->buffer = buffer; - args->version = version; + QueryArgs *args = new QueryArgs(); + args->epoch = epoch; args->query_parms = query_parms; - m_sched.schedule_job(async_query, 0, args); return args->result_set.get_future(); @@ -292,20 +390,19 @@ private: int internal_append(const R &rec, bool ts) { Buffer *buffer; - while (!(buffer = get_buffer())) + while (!(buffer = get_active_epoch()->get_active_buffer())) ; if (buffer->is_full()) { - auto vers = get_active_version(); + auto vers = get_active_epoch()->get_structure(); auto res = schedule_merge(vers, buffer); res.get(); } - return buffer->append(rec, ts); } - static std::vector> filter_deletes(std::vector> &records, ShardID shid, Buffer *buffer, Structure *vers) { + static std::vector> filter_deletes(std::vector> &records, ShardID shid, BufView *buffers, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -334,7 +431,7 @@ private: continue; } - if (buffer->check_tombstone(rec.rec)) { + if (buffers->check_tombstone(rec.rec)) { continue; } -- cgit v1.2.3 From 39ae3e0441d8297a09197aba98bd494b5ada12c1 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 30 Oct 2023 14:17:59 -0400 Subject: Concurrency updates + fixes for compile errors --- include/framework/DynamicExtension.h | 161 ++++++++++++++++++----------------- 1 file changed, 81 insertions(+), 80 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 6936247..d2a6b7a 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -39,7 +39,7 @@ class DynamicExtension { typedef S Shard; typedef MutableBuffer Buffer; typedef ExtensionStructure Structure; - typedef Epoch Epoch; + typedef Epoch _Epoch; typedef BufferView BufView; public: @@ -53,20 +53,24 @@ public: { auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop); - auto epoch = new Epoch(vers, buf); + auto epoch = new _Epoch(vers, buf); - m_buffers.push_back(new Buffer(buffer_cap, max_delete_prop*buffer_cap)); - m_versions.push_back(new Structure(buffer_cap, scale_factor, max_delete_prop)); - m_epochs.push_back({0, epoch}); + m_buffers.insert(new Buffer(buffer_cap, max_delete_prop*buffer_cap)); + m_versions.insert(new Structure(buffer_cap, scale_factor, max_delete_prop)); + m_epochs.insert({0, epoch}); } ~DynamicExtension() { - for (size_t i=0; idelete_record(rec); + return buffers.delete_record(rec); } /* @@ -97,7 +101,7 @@ public: } std::future> query(void *parms) { - return schedule_query(get_active_epoch()->get_structure(), get_active_epoch()->get_buffers()[0], parms); + return schedule_query(parms); } size_t get_record_count() { @@ -183,36 +187,37 @@ private: std::set m_versions; std::atomic m_current_epoch; - std::unordered_map m_epochs; + std::unordered_map m_epochs; size_t m_scale_factor; double m_max_delete_prop; size_t m_buffer_capacity; size_t m_buffer_delete_capacity; - Epoch *get_active_epoch() { + _Epoch *get_active_epoch() { return m_epochs[m_current_epoch.load()]; } void advance_epoch() { size_t new_epoch_num = m_current_epoch.load() + 1; - Epoch *new_epoch = m_epochs[new_epoch_num]; - Epoch *old_epoch = m_epochs[m_current_epoch.load()]; + _Epoch *new_epoch = m_epochs[new_epoch_num]; + _Epoch *old_epoch = m_epochs[m_current_epoch.load()]; // Update the new Epoch to contain the buffers // from the old one that it doesn't currently have size_t old_buffer_cnt = new_epoch->clear_buffers(); for (size_t i=old_buffer_cnt; iget_buffers().size(); i++) { - new_epoch->add_buffer(old_epoch->get_buffers[i]); + new_epoch->add_buffer(old_epoch->get_buffers()[i]); } m_current_epoch.fetch_add(1); + retire_epoch(old_epoch); } /* * Creates a new epoch by copying the currently active one. The new epoch's * structure will be a shallow copy of the old one's. */ - Epoch *create_new_epoch() { + _Epoch *create_new_epoch() { auto new_epoch = get_active_epoch()->clone(); std::unique_lock m_struct_lock; m_versions.insert(new_epoch->get_structure()); @@ -228,8 +233,8 @@ private: * buffer while a new epoch is being created in the background. Returns a * pointer to the newly created buffer. */ - Buffer *add_empty_buffer(Epoch *epoch) { - auto new_buffer = Buffer(m_buffer_capacity, m_buffer_delete_capacity); + Buffer *add_empty_buffer(_Epoch *epoch) { + auto new_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); std::unique_lock m_struct_lock; m_buffers.insert(new_buffer); @@ -239,7 +244,7 @@ private: return new_buffer; } - void retire_epoch(Epoch *epoch) { + void retire_epoch(_Epoch *epoch) { /* * Epochs with currently active jobs cannot * be retired. By the time retire_epoch is called, @@ -294,26 +299,15 @@ private: vers->merge_buffer(buff); - args->result.set_value(true); args->epoch->end_job(); - delete args; - } - - static std::vector finalize_query_result(std::vector>> &query_results, void *parms, - std::vector &shard_states, std::vector &buffer_states) { - auto result = Q::merge(query_results, parms); - - for (size_t i=0; iresult.set_value(true); - return result; + ((DynamicExtension *) args->extension)->advance_epoch(); + + // FIXME: this might break things... not sure + delete args; } - + static void async_query(void *arguments) { QueryArgs *args = (QueryArgs *) arguments; @@ -322,58 +316,56 @@ private: void *parms = args->query_parms; // Get the buffer query states - std::vector buffer_states = buffers->get_buffer_query_states(parms); + std::vector buffer_states = buffers.get_query_states(parms); // Get the shard query states std::vector> shards; - std::vector shard_states = vers->get_query_states(shards, parms); + std::vector states = vers->get_query_states(shards, parms); - Q::process_query_states(parms, shard_states, buffer_states); + Q::process_query_states(parms, states, buffer_states); std::vector>> query_results(shards.size() + buffer_states.size()); + for (size_t i=0; i> local_results = (i < buffer_states.size()) + ? Q::buffer_query(buffers.get_buffers()[i], buffer_states[i], parms) + : Q::query(shards[i - buffer_states.size()].second, + states[i - buffer_states.size()], parms); + ShardID shid = (i < buffer_states.size()) ? INVALID_SHID : shards[i - buffer_states.size()].first; + query_results[i] = std::move(filter_deletes(local_results, shid, buffers, vers)); - // Execute the query for the buffer - std::vector>> buffer_results(buffer_states.size()); - for (size_t i=0; iget_buffers[i], buffer_states[i], parms); - query_results[i] = std::move(filter_deletes(buffer_results, {-1, -1}, buffers, vers)); - - if constexpr (Q::EARLY_ABORT) { - if (query_results[i] > 0) { - return finalize_query_result(query_results, parms, buffer_states, shard_states); - } - } - } - - // Execute the query for each shard - for (size_t i=0; i 0) { - return finalize_query_result(query_results, parms, buffer_states, shard_states); - } + if (query_results[i].size() > 0) break; } } - - // Merge the results together and finalize the job - auto result = finalize_query_result(query_results, parms, buffer_states, shard_states); + + auto result = Q::merge(query_results, parms); args->result_set.set_value(std::move(result)); args->epoch->end_job(); + + for (size_t i=0; i schedule_merge() { - auto epoch = get_active_epoch(); + void schedule_merge() { + auto epoch = create_new_epoch(); epoch->start_job(); MergeArgs *args = new MergeArgs(); args->epoch = epoch; - args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]); + // FIXME: all full buffers can be merged at this point--but that requires + // retooling the shard interface a bit to do efficiently. + args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count()); + args->extension = this; m_sched.schedule_job(merge, 0, args); - - return args->result.get_future(); } std::future> schedule_query(void *query_parms) { @@ -389,20 +381,29 @@ private: } int internal_append(const R &rec, bool ts) { - Buffer *buffer; - while (!(buffer = get_active_epoch()->get_active_buffer())) - ; - - if (buffer->is_full()) { - auto vers = get_active_epoch()->get_structure(); - auto res = schedule_merge(vers, buffer); - res.get(); - } + Buffer *buffer = nullptr; + do { + auto epoch = get_active_epoch(); + + while (!(buffer = epoch->get_active_buffer())) + ; + + /* if the buffer is full, schedule a merge and add a new empty buffer */ + if (buffer->is_full()) { + // FIXME: possible race here--two identical merges could be scheduled + auto vers = epoch->get_structure(); + schedule_merge(); + buffer = add_empty_buffer(epoch); + } + + } while(!buffer->append(rec, ts)); - return buffer->append(rec, ts); + /* internal append should always succeed, eventually */ + return 1; } - static std::vector> filter_deletes(std::vector> &records, ShardID shid, BufView *buffers, Structure *vers) { + static std::vector> filter_deletes(std::vector> &records, ShardID shid, + BufView &buffers, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -431,7 +432,7 @@ private: continue; } - if (buffers->check_tombstone(rec.rec)) { + if (buffers.check_tombstone(rec.rec)) { continue; } -- cgit v1.2.3 From ceffd8caf5e4e827e2cc4d6975507a66d88f77a9 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 30 Oct 2023 14:25:28 -0400 Subject: DynamicExtension: adjusted a few operations to ensure conistency get_memory_usage, get_aux_memory_usage, get_record_count, get_tombstone_count, and create_static_structure have been adjusted to ensure that they pull from a consistent epoch, even if a change-over occurs midway through the function. These functions also now register with the epoch as a job, to ensure that the epoch they are operating own isn't retired midway through the function. Probably not a big issue for the accessors, but I could see it being very important for create_static_structure. --- include/framework/DynamicExtension.h | 48 +++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 15 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index d2a6b7a..eb78d48 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -105,13 +105,20 @@ public: } size_t get_record_count() { - size_t cnt = get_active_epoch()->get_buffer_view().get_record_count(); - return cnt + get_active_epoch()->get_structure()->get_record_count(); + auto epoch = get_active_epoch(); + epoch->start_job(); + auto t = epoch->get_buffer_view().get_record_count() + epoch->get_structure()->get_record_count(); + epoch->end_job(); + + return t; } - size_t get_tombstone_cnt() { - size_t cnt = get_active_epoch()->get_buffer_view().get_tombstone_count(); - return cnt + get_active_epoch()->get_structure()->get_tombstone_cnt(); + size_t get_tombstone_count() { + auto epoch = get_active_epoch(); + epoch->start_job(); + auto t = epoch->get_buffer_view().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); + epoch->end_job(); + return t; } size_t get_height() { @@ -119,17 +126,21 @@ public: } size_t get_memory_usage() { - auto vers = get_active_epoch()->get_structure()->get_memory_usage(); - auto buffer = get_active_epoch()->get_buffer_view().get_memory_usage(); + auto epoch = get_active_epoch(); + epoch->start_job(); + auto t= epoch->get_buffer_view().get_memory_usage() + epoch->get_structure()->get_memory_usage(); + epoch->end_job(); - return vers + buffer; + return t; } size_t get_aux_memory_usage() { - auto vers = get_active_epoch()->get_structure()->get_aux_memory_usage(); - auto buffer = get_active_epoch()->get_buffer_view().get_aux_memory_usage(); + auto epoch = get_active_epoch(); + epoch->start_job(); + auto t = epoch->get_buffer_view().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); + epoch->end_job(); - return vers + buffer; + return t; } size_t get_buffer_capacity() { @@ -137,7 +148,11 @@ public: } Shard *create_static_structure() { - auto vers = get_active_epoch()->get_structure(); + auto epoch = get_active_epoch(); + auto bv = epoch->get_buffer_view(); + epoch->start_job(); + + auto vers = epoch->get_structure(); std::vector shards; if (vers->get_levels().size() > 0) { @@ -148,9 +163,11 @@ public: } } - // FIXME: should use a buffer view--or perhaps some sort of a - // raw record iterator model. - shards.emplace_back(new S(get_active_epoch()->get_buffers()[0])); + // FIXME: With an interface adjustment, this could be done in + // one call, rather than a loop. + for (size_t i=bv.size() - 1; i>=0; i--) { + shards.emplace_back(new S(bv.get_buffers()[i])); + } Shard *shards_array[shards.size()]; @@ -167,6 +184,7 @@ public: delete shard; } + epoch->end_job(); return flattened; } -- cgit v1.2.3 From 32aeedbaf6584eb71126cbe92cb42e93b65d69d3 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 30 Oct 2023 14:47:35 -0400 Subject: Epoch/DynamicExtension: added cv to epoch retirement check Instead of busy waiting on the active job count, a condition variable is now used to wait for all active jobs to finish before freeing an epoch's resources. --- include/framework/DynamicExtension.h | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index eb78d48..21d0261 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -79,6 +79,8 @@ public: } int erase(const R &rec) { + // FIXME: delete tagging will require a lot of extra work to get + // operating "correctly" in a concurrent environment. if constexpr (D == DeletePolicy::TAGGING) { BufView buffers = get_active_epoch()->get_buffer_view(); @@ -118,6 +120,7 @@ public: epoch->start_job(); auto t = epoch->get_buffer_view().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); epoch->end_job(); + return t; } @@ -271,11 +274,9 @@ private: * accumulate new active jobs. Eventually, this * number will hit zero and the function will * proceed. - * - * FIXME: this can be replaced with a cv, which - * is probably a superior solution in this case */ - while (epoch->get_active_job_num() > 0) + + while (!epoch->retirable()) ; /* -- cgit v1.2.3 From 8ce1cb0eef7d5631f0f7788804845ddc8296ac6f Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 30 Oct 2023 14:52:45 -0400 Subject: DynamicExtension: comment cleanup/adjustments --- include/framework/DynamicExtension.h | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 21d0261..f2bbacc 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -224,8 +224,10 @@ private: _Epoch *new_epoch = m_epochs[new_epoch_num]; _Epoch *old_epoch = m_epochs[m_current_epoch.load()]; - // Update the new Epoch to contain the buffers - // from the old one that it doesn't currently have + /* + * Update the new Epoch to contain the buffers from the old one + * that it doesn't currently have + */ size_t old_buffer_cnt = new_epoch->clear_buffers(); for (size_t i=old_buffer_cnt; iget_buffers().size(); i++) { new_epoch->add_buffer(old_epoch->get_buffers()[i]); @@ -275,7 +277,6 @@ private: * number will hit zero and the function will * proceed. */ - while (!epoch->retirable()) ; @@ -323,7 +324,6 @@ private: ((DynamicExtension *) args->extension)->advance_epoch(); - // FIXME: this might break things... not sure delete args; } @@ -334,10 +334,10 @@ private: auto vers = args->epoch->get_structure(); void *parms = args->query_parms; - // Get the buffer query states + /* Get the buffer query states */ std::vector buffer_states = buffers.get_query_states(parms); - // Get the shard query states + /* Get the shard query states */ std::vector> shards; std::vector states = vers->get_query_states(shards, parms); @@ -370,7 +370,6 @@ private: Q::delete_query_state(states[i]); } - // FIXME: this might break things... not sure delete args; } @@ -430,8 +429,10 @@ private: std::vector> processed_records; processed_records.reserve(records.size()); - // For delete tagging, we just need to check the delete bit on each - // record. + /* + * For delete tagging, we just need to check the delete bit + * on each record. + */ if constexpr (D == DeletePolicy::TAGGING) { for (auto &rec : records) { if (rec.is_deleted()) { @@ -444,8 +445,10 @@ private: return processed_records; } - // For tombstone deletes, we need to search for the corresponding - // tombstone for each record. + /* + * For tombstone deletes, we need to search for the corresponding + * tombstone for each record. + */ for (auto &rec : records) { if (rec.is_tombstone()) { continue; -- cgit v1.2.3 From d2279e1b96d352a0af1d425dcaaf93e8a26a8d52 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 30 Oct 2023 17:15:05 -0400 Subject: General Comment + Consistency updates --- include/framework/DynamicExtension.h | 46 +++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 19 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index f2bbacc..9129060 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -1,7 +1,7 @@ /* * include/framework/DynamicExtension.h * - * Copyright (C) 2023 Douglas Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * Dong Xie * * All rights reserved. Published under the Modified BSD License. @@ -107,8 +107,7 @@ public: } size_t get_record_count() { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); auto t = epoch->get_buffer_view().get_record_count() + epoch->get_structure()->get_record_count(); epoch->end_job(); @@ -116,8 +115,7 @@ public: } size_t get_tombstone_count() { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); auto t = epoch->get_buffer_view().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); epoch->end_job(); @@ -129,8 +127,7 @@ public: } size_t get_memory_usage() { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); auto t= epoch->get_buffer_view().get_memory_usage() + epoch->get_structure()->get_memory_usage(); epoch->end_job(); @@ -138,8 +135,7 @@ public: } size_t get_aux_memory_usage() { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); auto t = epoch->get_buffer_view().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); epoch->end_job(); @@ -151,9 +147,8 @@ public: } Shard *create_static_structure() { - auto epoch = get_active_epoch(); + auto epoch = get_active_epoch_protected(); auto bv = epoch->get_buffer_view(); - epoch->start_job(); auto vers = epoch->get_structure(); std::vector shards; @@ -219,6 +214,11 @@ private: return m_epochs[m_current_epoch.load()]; } + _Epoch *get_active_epoch_protected() { + m_epochs[m_current_epoch.load()]->start_job(); + return m_epochs[m_current_epoch.load()]; + } + void advance_epoch() { size_t new_epoch_num = m_current_epoch.load() + 1; _Epoch *new_epoch = m_epochs[new_epoch_num]; @@ -241,6 +241,12 @@ private: * structure will be a shallow copy of the old one's. */ _Epoch *create_new_epoch() { + /* + * This epoch access is _not_ protected under the assumption that + * only one merge will be able to trigger at a time. If that condition + * is violated, it is possible that this code will clone a retired + * epoch. + */ auto new_epoch = get_active_epoch()->clone(); std::unique_lock m_struct_lock; m_versions.insert(new_epoch->get_structure()); @@ -311,6 +317,8 @@ private: MergeArgs *args = (MergeArgs *) arguments; Structure *vers = args->epoch->get_structure(); + // FIXME: with an improved shard interface, multiple full buffers + // could be merged at once here. Buffer *buff = (Buffer *) args->epoch->get_buffers()[0]; for (ssize_t i=args->merges.size() - 1; i>=0; i--) { @@ -387,24 +395,23 @@ private: } std::future> schedule_query(void *query_parms) { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); QueryArgs *args = new QueryArgs(); args->epoch = epoch; args->query_parms = query_parms; + auto result = args->result_set.get_future(); + m_sched.schedule_job(async_query, 0, args); - return args->result_set.get_future(); + return result; } int internal_append(const R &rec, bool ts) { Buffer *buffer = nullptr; do { - auto epoch = get_active_epoch(); - - while (!(buffer = epoch->get_active_buffer())) - ; + auto epoch = get_active_epoch_protected(); + buffer = epoch->get_active_buffer(); /* if the buffer is full, schedule a merge and add a new empty buffer */ if (buffer->is_full()) { @@ -413,7 +420,8 @@ private: schedule_merge(); buffer = add_empty_buffer(epoch); } - + // FIXME: not exactly the best spot for this + epoch->end_job(); } while(!buffer->append(rec, ts)); /* internal append should always succeed, eventually */ -- cgit v1.2.3 From c00900c5bfbc23537bf7084a927e7fd2ef0a5c94 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 11:01:05 -0400 Subject: DynamicExtension: added a way to block on merge completion This is mostly just for testing purposes at the moment, though I'd imagine it may be useful for other reasons too. --- include/framework/DynamicExtension.h | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 9129060..2f0327f 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -146,7 +146,11 @@ public: return m_buffer_capacity; } - Shard *create_static_structure() { + Shard *create_static_structure(bool await_merge_completion=false) { + if (await_merge_completion) { + await_next_epoch(); + } + auto epoch = get_active_epoch_protected(); auto bv = epoch->get_buffer_view(); @@ -186,6 +190,19 @@ public: return flattened; } + /* + * If the current epoch is *not* the newest one, then wait for + * the newest one to become available. Otherwise, returns immediately. + */ + void await_next_epoch() { + while (m_current_epoch.load() != m_newest_epoch.load()) { + std::unique_lock m_epoch_cv_lk; + m_epoch_cv.wait(m_epoch_cv_lk); + } + + return; + } + /* * Mostly exposed for unit-testing purposes. Verifies that the current * active version of the ExtensionStructure doesn't violate the maximum @@ -203,8 +220,12 @@ private: std::set m_versions; std::atomic m_current_epoch; + std::atomic m_newest_epoch; std::unordered_map m_epochs; + std::condition_variable m_epoch_cv; + std::mutex m_epoch_cv_lk; + size_t m_scale_factor; double m_max_delete_prop; size_t m_buffer_capacity; @@ -220,7 +241,7 @@ private: } void advance_epoch() { - size_t new_epoch_num = m_current_epoch.load() + 1; + size_t new_epoch_num = m_newest_epoch.load(); _Epoch *new_epoch = m_epochs[new_epoch_num]; _Epoch *old_epoch = m_epochs[m_current_epoch.load()]; @@ -233,6 +254,12 @@ private: new_epoch->add_buffer(old_epoch->get_buffers()[i]); } m_current_epoch.fetch_add(1); + + /* notify any blocking threads that the new epoch is available */ + m_epoch_cv_lk.lock(); + m_epoch_cv.notify_all(); + m_epoch_cv_lk.unlock(); + retire_epoch(old_epoch); } @@ -250,7 +277,8 @@ private: auto new_epoch = get_active_epoch()->clone(); std::unique_lock m_struct_lock; m_versions.insert(new_epoch->get_structure()); - m_epochs.insert({m_current_epoch.load() + 1, new_epoch}); + m_newest_epoch.fetch_add(1); + m_epochs.insert({m_newest_epoch.load(), new_epoch}); m_struct_lock.release(); return new_epoch; -- cgit v1.2.3 From 1b8bec5ea882584aba62c92d1ab6ffaf03e7b9b5 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 11:51:46 -0400 Subject: DynamicExtension: fixed some Epoch-related bugs The epochs must be released in the destructor prior to releasing the buffers and structures, as otherwise there are references remaining to these objects and their destructors will fail. Additionally, fixed a bug in the constructor resulting in a memory leak due to allocating an extra starting version and buffer. --- include/framework/DynamicExtension.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 2f0327f..a1f7c2b 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -55,12 +55,16 @@ public: auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop); auto epoch = new _Epoch(vers, buf); - m_buffers.insert(new Buffer(buffer_cap, max_delete_prop*buffer_cap)); - m_versions.insert(new Structure(buffer_cap, scale_factor, max_delete_prop)); + m_buffers.insert(buf); + m_versions.insert(vers); m_epochs.insert({0, epoch}); } ~DynamicExtension() { + for (auto e : m_epochs) { + delete e.second; + } + for (auto e : m_buffers) { delete e; } @@ -68,10 +72,6 @@ public: for (auto e : m_versions) { delete e; } - - for (auto e : m_epochs) { - delete e.second; - } } int insert(const R &rec) { -- cgit v1.2.3 From 7163b8db0ee5acc099a228090a4bdee379c1c8af Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 11:53:08 -0400 Subject: SerialScheduler: added a single-threaded scheduler Added a new scheduler for ensuring single-threaded operation. Additionally, added a static assert to (at least for now) restrict the use of tagging to this single threaded scheduler. --- include/framework/DynamicExtension.h | 2 ++ 1 file changed, 2 insertions(+) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index a1f7c2b..3a4a7e1 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -26,6 +26,7 @@ #include "framework/util/Configuration.h" #include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/SerialScheduler.h" #include "framework/scheduling/Epoch.h" #include "psu-util/timer.h" @@ -82,6 +83,7 @@ public: // FIXME: delete tagging will require a lot of extra work to get // operating "correctly" in a concurrent environment. if constexpr (D == DeletePolicy::TAGGING) { + static_assert(std::same_as, "Tagging is only supported in single-threaded operation"); BufView buffers = get_active_epoch()->get_buffer_view(); if (get_active_epoch()->get_structure()->tagged_delete(rec)) { -- cgit v1.2.3 From 68ae6279476e7d37837ac06474fb558e50ce6706 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 12:41:55 -0400 Subject: Fixes for various bugs under SerialScheduler --- include/framework/DynamicExtension.h | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 3a4a7e1..5c1eaab 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -54,7 +54,7 @@ public: { auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop); - auto epoch = new _Epoch(vers, buf); + auto epoch = new _Epoch(0, vers, buf); m_buffers.insert(buf); m_versions.insert(vers); @@ -249,13 +249,19 @@ private: /* * Update the new Epoch to contain the buffers from the old one - * that it doesn't currently have + * that it doesn't currently have if using a multi-threaded + * scheduler (otherwise, there is only one buffer that is + * reused, so it shouldn't be removed) */ - size_t old_buffer_cnt = new_epoch->clear_buffers(); - for (size_t i=old_buffer_cnt; iget_buffers().size(); i++) { - new_epoch->add_buffer(old_epoch->get_buffers()[i]); + if constexpr (!std::same_as) { + size_t old_buffer_cnt = new_epoch->clear_buffers(); + for (size_t i=old_buffer_cnt; iget_buffers().size(); i++) { + new_epoch->add_buffer(old_epoch->get_buffers()[i]); + } } + m_current_epoch.fetch_add(1); + old_epoch->set_inactive(); /* notify any blocking threads that the new epoch is available */ m_epoch_cv_lk.lock(); @@ -276,10 +282,10 @@ private: * is violated, it is possible that this code will clone a retired * epoch. */ - auto new_epoch = get_active_epoch()->clone(); + m_newest_epoch.fetch_add(1); + auto new_epoch = get_active_epoch()->clone(m_newest_epoch.load()); std::unique_lock m_struct_lock; m_versions.insert(new_epoch->get_structure()); - m_newest_epoch.fetch_add(1); m_epochs.insert({m_newest_epoch.load(), new_epoch}); m_struct_lock.release(); @@ -316,6 +322,9 @@ private: while (!epoch->retirable()) ; + /* remove epoch from the framework's map */ + m_epochs.erase(epoch->get_epoch_number()); + /* * The epoch's destructor will handle releasing * all the references it holds @@ -440,7 +449,8 @@ private: int internal_append(const R &rec, bool ts) { Buffer *buffer = nullptr; do { - auto epoch = get_active_epoch_protected(); + // FIXME: figure out best way to protect this epoch access + auto epoch = get_active_epoch(); buffer = epoch->get_active_buffer(); /* if the buffer is full, schedule a merge and add a new empty buffer */ @@ -448,10 +458,13 @@ private: // FIXME: possible race here--two identical merges could be scheduled auto vers = epoch->get_structure(); schedule_merge(); - buffer = add_empty_buffer(epoch); + + if constexpr (std::same_as) { + buffer->truncate(); + } else { + buffer = add_empty_buffer(epoch); + } } - // FIXME: not exactly the best spot for this - epoch->end_job(); } while(!buffer->append(rec, ts)); /* internal append should always succeed, eventually */ -- cgit v1.2.3 From 97a4d0fcedb75cbfe5a2e0162e54e71cd9eb0708 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 10:01:48 -0500 Subject: DynamicExtension: fixed some use-after free bugs Reordered some code in internal_append() to avoid use-after frees on the mutable buffer reference used for insertion. --- include/framework/DynamicExtension.h | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 5c1eaab..49c6905 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -244,8 +244,11 @@ private: void advance_epoch() { size_t new_epoch_num = m_newest_epoch.load(); + size_t old_epoch_num = m_current_epoch.load(); + assert(new_epoch_num != old_epoch_num); + _Epoch *new_epoch = m_epochs[new_epoch_num]; - _Epoch *old_epoch = m_epochs[m_current_epoch.load()]; + _Epoch *old_epoch = m_epochs[old_epoch_num]; /* * Update the new Epoch to contain the buffers from the old one @@ -302,10 +305,10 @@ private: auto new_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); std::unique_lock m_struct_lock; + epoch->add_buffer(new_buffer); m_buffers.insert(new_buffer); m_struct_lock.release(); - epoch->add_buffer(new_buffer); return new_buffer; } @@ -448,24 +451,29 @@ private: int internal_append(const R &rec, bool ts) { Buffer *buffer = nullptr; + int res; do { // FIXME: figure out best way to protect this epoch access - auto epoch = get_active_epoch(); + auto epoch = get_active_epoch_protected(); buffer = epoch->get_active_buffer(); /* if the buffer is full, schedule a merge and add a new empty buffer */ if (buffer->is_full()) { // FIXME: possible race here--two identical merges could be scheduled auto vers = epoch->get_structure(); - schedule_merge(); if constexpr (std::same_as) { buffer->truncate(); } else { buffer = add_empty_buffer(epoch); } + + schedule_merge(); } - } while(!buffer->append(rec, ts)); + + res = buffer->append(rec, ts); + epoch->end_job(); + } while(!res); /* internal append should always succeed, eventually */ return 1; -- cgit v1.2.3 From fe136eda414d3f7897d4610faeda8dbb3b7bb400 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 12:33:18 -0500 Subject: DynamicExtension::create_static_structure: fixed heap overflow --- include/framework/DynamicExtension.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 49c6905..76722c0 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -169,7 +169,7 @@ public: // FIXME: With an interface adjustment, this could be done in // one call, rather than a loop. - for (size_t i=bv.size() - 1; i>=0; i--) { + for (ssize_t i=bv.size() - 1; i>=0; i--) { shards.emplace_back(new S(bv.get_buffers()[i])); } -- cgit v1.2.3 From ca1605a9924e27ccbacb33d04ccdb4326e7abe74 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 12:37:06 -0500 Subject: Epoch: Adjusted add empty buffer behavior Add empty buffer now supports a CAS-like operation, where it will only add a buffer if the currently active one is still the same as when the decision to add a buffer was made. This is to support adding new buffers on insert outside of the merge-lock, so that multiple concurrent threads cannot add multiple new empty buffers. --- include/framework/DynamicExtension.h | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 76722c0..955dbe5 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -301,14 +301,22 @@ private: * buffer while a new epoch is being created in the background. Returns a * pointer to the newly created buffer. */ - Buffer *add_empty_buffer(_Epoch *epoch) { - auto new_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); + Buffer *add_empty_buffer(_Epoch *epoch, Buffer *current_buffer=nullptr) { + auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); std::unique_lock m_struct_lock; - epoch->add_buffer(new_buffer); + auto new_buffer = epoch->add_buffer(temp_buffer, current_buffer); + /* + * if epoch->add_buffer doesn't add the new buffer, this insert + * won't update the buffer set (duplicate insert) + */ m_buffers.insert(new_buffer); m_struct_lock.release(); + if (new_buffer != temp_buffer) { + delete temp_buffer; + } + return new_buffer; } -- cgit v1.2.3 From ad117358b8ab9924d216edeca0eafa87b4f86896 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 12:38:58 -0500 Subject: DynamicExtension: mutex bug fix Fixed an incorrectly initialized lock guard --- include/framework/DynamicExtension.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 955dbe5..8ce6a7a 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -198,8 +198,8 @@ public: */ void await_next_epoch() { while (m_current_epoch.load() != m_newest_epoch.load()) { - std::unique_lock m_epoch_cv_lk; - m_epoch_cv.wait(m_epoch_cv_lk); + std::unique_lock lk(m_epoch_cv_lk); + m_epoch_cv.wait(lk); } return; -- cgit v1.2.3 From 254f8aa85ea8962e5c11d8b475a171883c22f168 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 12:39:35 -0500 Subject: DynamicExtension: internal_append fixes Fixed a few bugs with concurrent operation of internal_append, as well as enabled the spawning of multiple empty buffers while merges are currently active. --- include/framework/DynamicExtension.h | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 8ce6a7a..60aa07e 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -258,7 +258,11 @@ private: */ if constexpr (!std::same_as) { size_t old_buffer_cnt = new_epoch->clear_buffers(); - for (size_t i=old_buffer_cnt; iget_buffers().size(); i++) { + // FIXME: this is getting nightmarish... The -1 here is to ensure that the + // the empty buffer added when the merge was first triggered is also included. + // Due to the reordering of operations in internal_append, the new buffer exists + // at the time of the clone, and so is already in the new epoch. + for (size_t i=old_buffer_cnt-1; iget_buffers().size(); i++) { new_epoch->add_buffer(old_epoch->get_buffers()[i]); } } @@ -459,24 +463,36 @@ private: int internal_append(const R &rec, bool ts) { Buffer *buffer = nullptr; - int res; + int res = 0; do { - // FIXME: figure out best way to protect this epoch access auto epoch = get_active_epoch_protected(); buffer = epoch->get_active_buffer(); + assert(buffer); - /* if the buffer is full, schedule a merge and add a new empty buffer */ + /* + * If the buffer is full and there is no current merge, + * schedule a merge and add a new empty buffer. If there + * is a current merge, then just add a new empty buffer + * to the current epoch. + */ if (buffer->is_full()) { - // FIXME: possible race here--two identical merges could be scheduled - auto vers = epoch->get_structure(); - if constexpr (std::same_as) { + /* single threaded: run merge and then empty buffer */ + epoch->end_job(); + schedule_merge(); buffer->truncate(); - } else { + continue; + } else if (epoch->prepare_merge()) { + /* + * add an empty buffer to allow insert proceed and + * schedule a merge on a background thread + */ buffer = add_empty_buffer(epoch); + schedule_merge(); + } else { + /* background merge is ongoing, so just add empty buffer */ + buffer = add_empty_buffer(epoch, buffer); } - - schedule_merge(); } res = buffer->append(rec, ts); -- cgit v1.2.3 From 9fd6264122f09752b4278c9ff881b4cfe906bbc8 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 14:30:00 -0500 Subject: DynamicExtension: fixed race in get_active_epoch_protected This function wasn't ensuring that that the epoch pinned and the epoch returned were the same epoch in the situation where the epoch was advanced in the middle of the call. This is now resolved, and further the function will return the newer epoch, rather than the older one, in such a situation. --- include/framework/DynamicExtension.h | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 60aa07e..233bebb 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -238,8 +238,17 @@ private: } _Epoch *get_active_epoch_protected() { - m_epochs[m_current_epoch.load()]->start_job(); - return m_epochs[m_current_epoch.load()]; + ssize_t cur_epoch = -1; + do { + if (cur_epoch != -1) { + m_epochs[cur_epoch]->end_job(); + } + + cur_epoch = m_current_epoch.load(); + m_epochs[cur_epoch]->start_job(); + } while (cur_epoch != m_current_epoch.load()); + + return m_epochs[cur_epoch]; } void advance_epoch() { -- cgit v1.2.3 From 355ddd7b595fce201c305caecea415ab325e170e Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 7 Nov 2023 15:14:38 -0500 Subject: DynamicExtension: revised the way uneeded buffers/structures are released --- include/framework/DynamicExtension.h | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 233bebb..edbb6f5 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -271,6 +271,7 @@ private: // the empty buffer added when the merge was first triggered is also included. // Due to the reordering of operations in internal_append, the new buffer exists // at the time of the clone, and so is already in the new epoch. + std::unique_lock lk(m_struct_lock); for (size_t i=old_buffer_cnt-1; iget_buffers().size(); i++) { new_epoch->add_buffer(old_epoch->get_buffers()[i]); } @@ -361,17 +362,23 @@ private: * be safely freed. */ std::unique_lock lock(m_struct_lock); - for (auto buf : m_buffers) { - if (buf->get_reference_count() == 0) { - m_buffers.erase(buf); - delete buf; + for (auto itr = m_buffers.begin(); itr != m_buffers.end();) { + if ((*itr)->get_reference_count() == 0) { + auto tmp = *itr; + itr = m_buffers.erase(itr); + delete tmp; + } else { + itr++; } } - for (auto vers : m_versions) { - if (vers->get_reference_count() == 0) { - m_versions.erase(vers); - delete vers; + for (auto itr = m_versions.begin(); itr != m_versions.end();) { + if ((*itr)->get_reference_count() == 0) { + auto tmp = *itr; + itr = m_versions.erase(itr); + delete tmp; + } else { + itr++; } } } -- cgit v1.2.3 From 357cab549c2ed33970562b84ff6f83923742343d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 7 Nov 2023 15:34:24 -0500 Subject: Comment and License updates --- include/framework/DynamicExtension.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index edbb6f5..7244856 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -4,7 +4,7 @@ * Copyright (C) 2023 Douglas B. Rumbaugh * Dong Xie * - * All rights reserved. Published under the Modified BSD License. + * Distributed under the Modified BSD License. * */ #pragma once -- cgit v1.2.3 From 39d22316be1708073e4fe1f708814cc801ecdc69 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 9 Nov 2023 11:08:34 -0500 Subject: Fixed various concurrency bugs 1. The system should now cleanly shutdown when the DynamicExtension object is destroyed. Before now, this would lead to use-after-frees and/or deadlocks. 2. Improved synchronization on mutable buffer structure management to fix the issue of the framework losing track of buffers during Epoch changeovers. --- include/framework/DynamicExtension.h | 77 +++++++++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 14 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 7244856..a6047ea 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -62,6 +62,17 @@ public: } ~DynamicExtension() { + + /* let any in-flight epoch transition finish */ + await_next_epoch(); + + /* deactivate the active epoch */ + get_active_epoch()->set_inactive(); + + /* shutdown the scheduler */ + m_sched.shutdown(); + + /* delete all held resources */ for (auto e : m_epochs) { delete e.second; } @@ -125,7 +136,11 @@ public: } size_t get_height() { - return get_active_epoch()->get_structure()->get_height(); + auto epoch = get_active_epoch_protected(); + auto t = epoch->get_structure()->get_height(); + epoch->end_job(); + + return t; } size_t get_memory_usage() { @@ -211,7 +226,10 @@ public: * tombstone proportion invariant. */ bool validate_tombstone_proportion() { - return get_active_epoch()->get_structure()->validate_tombstone_proportion(); + auto epoch = get_active_epoch_protected(); + auto t = epoch->get_structure()->validate_tombstone_proportion(); + epoch->end_job(); + return t; } private: @@ -228,6 +246,8 @@ private: std::condition_variable m_epoch_cv; std::mutex m_epoch_cv_lk; + std::mutex m_epoch_transition_lk; + size_t m_scale_factor; double m_max_delete_prop; size_t m_buffer_capacity; @@ -252,6 +272,8 @@ private: } void advance_epoch() { + m_epoch_transition_lk.lock(); + size_t new_epoch_num = m_newest_epoch.load(); size_t old_epoch_num = m_current_epoch.load(); assert(new_epoch_num != old_epoch_num); @@ -267,18 +289,19 @@ private: */ if constexpr (!std::same_as) { size_t old_buffer_cnt = new_epoch->clear_buffers(); - // FIXME: this is getting nightmarish... The -1 here is to ensure that the - // the empty buffer added when the merge was first triggered is also included. - // Due to the reordering of operations in internal_append, the new buffer exists - // at the time of the clone, and so is already in the new epoch. - std::unique_lock lk(m_struct_lock); - for (size_t i=old_buffer_cnt-1; iget_buffers().size(); i++) { + + /* + * skip the first buffer, as this was the one that got merged, + * and copy all the other buffer references into the new epoch + */ + for (size_t i=1; iget_buffers().size(); i++) { new_epoch->add_buffer(old_epoch->get_buffers()[i]); } } m_current_epoch.fetch_add(1); old_epoch->set_inactive(); + m_epoch_transition_lk.unlock(); /* notify any blocking threads that the new epoch is available */ m_epoch_cv_lk.lock(); @@ -310,16 +333,41 @@ private: } /* - * Add a new empty buffer to the specified epoch. This is intended to be used + * Add a new empty buffer. This is intended to be used * when a merge is triggered, to allow for inserts to be sustained in the new * buffer while a new epoch is being created in the background. Returns a * pointer to the newly created buffer. */ - Buffer *add_empty_buffer(_Epoch *epoch, Buffer *current_buffer=nullptr) { + Buffer *add_empty_buffer() { + /* + * if there's a current Epoch transition ongoing, a buffer installed + * into an older Epoch, but not the new one, may be lost. So fail to + * insert a buffer. + */ + if (!m_epoch_transition_lk.try_lock()) { + return nullptr; + } + + /* + * verify that the currently active buffer is still full, if + * not, there is no reason to add a new one. This code is + * protected by the epoch transition lock, so need need to + * take a protected reference to the epoch. + */ + auto active_epoch = get_active_epoch(); + if (!active_epoch->get_active_buffer()->is_full()) { + m_epoch_transition_lk.unlock(); + return nullptr; + } + + /* + * create a new buffer and install it in the active epoch. + */ auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); std::unique_lock m_struct_lock; - auto new_buffer = epoch->add_buffer(temp_buffer, current_buffer); + auto new_buffer = active_epoch->add_buffer(temp_buffer); + /* * if epoch->add_buffer doesn't add the new buffer, this insert * won't update the buffer set (duplicate insert) @@ -330,6 +378,7 @@ private: if (new_buffer != temp_buffer) { delete temp_buffer; } + m_epoch_transition_lk.unlock(); return new_buffer; } @@ -503,15 +552,15 @@ private: * add an empty buffer to allow insert proceed and * schedule a merge on a background thread */ - buffer = add_empty_buffer(epoch); + buffer = add_empty_buffer(); schedule_merge(); } else { /* background merge is ongoing, so just add empty buffer */ - buffer = add_empty_buffer(epoch, buffer); + buffer = add_empty_buffer(); } } - res = buffer->append(rec, ts); + res = (buffer) ? buffer->append(rec, ts) : 0; epoch->end_job(); } while(!res); -- cgit v1.2.3 From 83486744600e8be338c75c2e3d2339452a392a9d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 13 Nov 2023 10:41:13 -0500 Subject: Fixed merge logic bug in tiering In InternalLevel::clone(), the m_shard_cnt variable was not being set appropriately in the clone, resulting in the record counts reported for a multi-shard level to be reported incorrectly. In DynamicExtension::merge(), the merges were being performed in the wrong order, resulting in multi-level merges deleting records. The leveling tests all passed even with this bug for some reason, but it caused tiering tests to fail. It isn't clear _why_ leveling appeared to work, but the bug is now fixed, so that's largely irrelevant I suppose. --- include/framework/DynamicExtension.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index a6047ea..9554c8c 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -436,11 +436,12 @@ private: MergeArgs *args = (MergeArgs *) arguments; Structure *vers = args->epoch->get_structure(); + // FIXME: with an improved shard interface, multiple full buffers // could be merged at once here. Buffer *buff = (Buffer *) args->epoch->get_buffers()[0]; - for (ssize_t i=args->merges.size() - 1; i>=0; i--) { + for (ssize_t i=0; imerges.size(); i++) { vers->merge_levels(args->merges[i].second, args->merges[i].first); } -- cgit v1.2.3 From 90bb0614fc1d8f1a185a778e31aaf9027c01aeb8 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 13 Nov 2023 11:44:09 -0500 Subject: Tombstone Compaction: re-enabled tombstone compaction Currently, proactive buffer tombstone compaction is disabled by forcing the buffer tombstone capacity to match its record capacity. It isn't clear how to best handle proactive buffer compactions in an environment where new buffers are spawned anyway. --- include/framework/DynamicExtension.h | 55 ++++++++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 9554c8c..9adc320 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -253,6 +253,32 @@ private: size_t m_buffer_capacity; size_t m_buffer_delete_capacity; + void enforce_delete_invariant(_Epoch *epoch) { + auto structure = epoch->get_structure(); + auto compactions = structure->get_compaction_tasks(); + + while (compactions.size() > 0) { + /* otherwise, we need to schedule a merge to compact tombstones */ + MergeArgs *args = new MergeArgs(); + args->epoch = epoch; + // FIXME: all full buffers can be merged at this point--but that requires + // retooling the shard interface a bit to do efficiently. + args->merges = compactions; + args->extension = this; + args->compaction = true; + + auto wait = args->result.get_future(); + + epoch->start_job(); + m_sched.schedule_job(merge, 0, args); + + /* wait for merge completion */ + wait.get(); + + compactions = structure->get_compaction_tasks(); + } + } + _Epoch *get_active_epoch() { return m_epochs[m_current_epoch.load()]; } @@ -272,6 +298,7 @@ private: } void advance_epoch() { + m_epoch_transition_lk.lock(); size_t new_epoch_num = m_newest_epoch.load(); @@ -281,6 +308,15 @@ private: _Epoch *new_epoch = m_epochs[new_epoch_num]; _Epoch *old_epoch = m_epochs[old_epoch_num]; + /* + * Verify the tombstone invariant within the epoch's structure, this + * may require scheduling additional merges. + * + * FIXME: having this inside the lock is going to TANK + * insertion performance. + */ + enforce_delete_invariant(new_epoch); + /* * Update the new Epoch to contain the buffers from the old one * that it doesn't currently have if using a multi-threaded @@ -445,12 +481,26 @@ private: vers->merge_levels(args->merges[i].second, args->merges[i].first); } - vers->merge_buffer(buff); + /* + * if the merge is a compaction, don't push the buffer down, + * as there is no guarantee that the merges will free up + * sufficient space in L0 + */ + if (!args->compaction) { + vers->merge_buffer(buff); + } args->epoch->end_job(); args->result.set_value(true); - ((DynamicExtension *) args->extension)->advance_epoch(); + /* + * Compactions occur on an epoch _before_ it becomes active, + * and as a result the active epoch should _not_ be advanced as + * part of a compaction merge + */ + if (!args->compaction) { + ((DynamicExtension *) args->extension)->advance_epoch(); + } delete args; } @@ -511,6 +561,7 @@ private: // retooling the shard interface a bit to do efficiently. args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count()); args->extension = this; + args->compaction = false; m_sched.schedule_job(merge, 0, args); } -- cgit v1.2.3 From fe12926c41eed825da80a36d77b7facd9ba0567a Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 15 Nov 2023 15:18:33 -0500 Subject: Lock protect Epoch during retirement to avoid use-after-free errors --- include/framework/DynamicExtension.h | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 9adc320..8edcc5f 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -14,6 +14,8 @@ #include #include #include +#include +#include #include "framework/structure/MutableBuffer.h" #include "framework/structure/InternalLevel.h" @@ -247,6 +249,7 @@ private: std::mutex m_epoch_cv_lk; std::mutex m_epoch_transition_lk; + std::shared_mutex m_epoch_retire_lk; size_t m_scale_factor; double m_max_delete_prop; @@ -284,15 +287,10 @@ private: } _Epoch *get_active_epoch_protected() { - ssize_t cur_epoch = -1; - do { - if (cur_epoch != -1) { - m_epochs[cur_epoch]->end_job(); - } - - cur_epoch = m_current_epoch.load(); - m_epochs[cur_epoch]->start_job(); - } while (cur_epoch != m_current_epoch.load()); + m_epoch_retire_lk.lock_shared(); + auto cur_epoch = m_current_epoch.load(); + m_epochs[cur_epoch]->start_job(); + m_epoch_retire_lk.unlock_shared(); return m_epochs[cur_epoch]; } @@ -429,8 +427,14 @@ private: * number will hit zero and the function will * proceed. */ - while (!epoch->retirable()) - ; + + do { + m_epoch_retire_lk.lock(); + if (epoch->retirable()) { + break; + } + m_epoch_retire_lk.unlock(); + } while (true); /* remove epoch from the framework's map */ m_epochs.erase(epoch->get_epoch_number()); @@ -440,6 +444,7 @@ private: * all the references it holds */ delete epoch; + m_epoch_retire_lk.unlock(); /* * Following the epoch's destruction, any buffers -- cgit v1.2.3 From 3c127eda69295cb306739bdd3c5ddccff6026a8d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 13 Dec 2023 12:39:54 -0500 Subject: Refactoring: corrected a number of names and added more comments --- include/framework/DynamicExtension.h | 77 ++++++++++++++++++------------------ 1 file changed, 39 insertions(+), 38 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 8edcc5f..fe43c52 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -165,8 +165,8 @@ public: return m_buffer_capacity; } - Shard *create_static_structure(bool await_merge_completion=false) { - if (await_merge_completion) { + Shard *create_static_structure(bool await_reconstruction_completion=false) { + if (await_reconstruction_completion) { await_next_epoch(); } @@ -179,7 +179,7 @@ public: if (vers->get_levels().size() > 0) { for (int i=vers->get_levels().size() - 1; i>= 0; i--) { if (vers->get_levels()[i]) { - shards.emplace_back(vers->get_levels()[i]->get_merged_shard()); + shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); } } } @@ -261,10 +261,10 @@ private: auto compactions = structure->get_compaction_tasks(); while (compactions.size() > 0) { - /* otherwise, we need to schedule a merge to compact tombstones */ - MergeArgs *args = new MergeArgs(); + /* otherwise, we need to schedule a compaction */ + ReconstructionArgs *args = new ReconstructionArgs(); args->epoch = epoch; - // FIXME: all full buffers can be merged at this point--but that requires + // FIXME: all full buffers can be flushed at this point--but that requires // retooling the shard interface a bit to do efficiently. args->merges = compactions; args->extension = this; @@ -273,9 +273,9 @@ private: auto wait = args->result.get_future(); epoch->start_job(); - m_sched.schedule_job(merge, 0, args); + m_sched.schedule_job(reconstruction, 0, args); - /* wait for merge completion */ + /* wait for reconstruction completion */ wait.get(); compactions = structure->get_compaction_tasks(); @@ -308,7 +308,7 @@ private: /* * Verify the tombstone invariant within the epoch's structure, this - * may require scheduling additional merges. + * may require scheduling additional reconstructions. * * FIXME: having this inside the lock is going to TANK * insertion performance. @@ -325,8 +325,9 @@ private: size_t old_buffer_cnt = new_epoch->clear_buffers(); /* - * skip the first buffer, as this was the one that got merged, - * and copy all the other buffer references into the new epoch + * skip the first buffer, as this was flushed into the epoch's + * structure already, and copy all the other buffer references + * into the new epoch */ for (size_t i=1; iget_buffers().size(); i++) { new_epoch->add_buffer(old_epoch->get_buffers()[i]); @@ -352,7 +353,7 @@ private: _Epoch *create_new_epoch() { /* * This epoch access is _not_ protected under the assumption that - * only one merge will be able to trigger at a time. If that condition + * only one reconstruction will be able to trigger at a time. If that condition * is violated, it is possible that this code will clone a retired * epoch. */ @@ -368,7 +369,7 @@ private: /* * Add a new empty buffer. This is intended to be used - * when a merge is triggered, to allow for inserts to be sustained in the new + * when a reconstruction is triggered, to allow for inserts to be sustained in the new * buffer while a new epoch is being created in the background. Returns a * pointer to the newly created buffer. */ @@ -429,13 +430,12 @@ private: */ do { - m_epoch_retire_lk.lock(); if (epoch->retirable()) { break; } - m_epoch_retire_lk.unlock(); } while (true); + m_epoch_retire_lk.lock(); /* remove epoch from the framework's map */ m_epochs.erase(epoch->get_epoch_number()); @@ -473,26 +473,26 @@ private: } } - static void merge(void *arguments) { - MergeArgs *args = (MergeArgs *) arguments; + static void reconstruction(void *arguments) { + ReconstructionArgs *args = (ReconstructionArgs *) arguments; Structure *vers = args->epoch->get_structure(); // FIXME: with an improved shard interface, multiple full buffers - // could be merged at once here. + // could be flushed at once here. Buffer *buff = (Buffer *) args->epoch->get_buffers()[0]; for (ssize_t i=0; imerges.size(); i++) { - vers->merge_levels(args->merges[i].second, args->merges[i].first); + vers->reconstruction(args->merges[i].second, args->merges[i].first); } /* - * if the merge is a compaction, don't push the buffer down, - * as there is no guarantee that the merges will free up - * sufficient space in L0 + * if performing a compaction, don't push the buffer down, + * as there is no guarantee that any necessary reconstructions + * will free sufficient space in L0 to support a flush */ if (!args->compaction) { - vers->merge_buffer(buff); + vers->flush_buffer(buff); } args->epoch->end_job(); @@ -501,7 +501,7 @@ private: /* * Compactions occur on an epoch _before_ it becomes active, * and as a result the active epoch should _not_ be advanced as - * part of a compaction merge + * part of a compaction */ if (!args->compaction) { ((DynamicExtension *) args->extension)->advance_epoch(); @@ -556,18 +556,19 @@ private: delete args; } - void schedule_merge() { + void schedule_reconstruction() { + //fprintf(stderr, "%ld\t Reconstruction Scheduling", m_current_epoch); auto epoch = create_new_epoch(); epoch->start_job(); - MergeArgs *args = new MergeArgs(); + ReconstructionArgs *args = new ReconstructionArgs(); args->epoch = epoch; - // FIXME: all full buffers can be merged at this point--but that requires + // FIXME: all full buffers can be flushed at this point--but that requires // retooling the shard interface a bit to do efficiently. - args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count()); + args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffers()[0]->get_record_count()); args->extension = this; args->compaction = false; - m_sched.schedule_job(merge, 0, args); + m_sched.schedule_job(reconstruction, 0, args); } std::future> schedule_query(void *query_parms) { @@ -592,27 +593,27 @@ private: assert(buffer); /* - * If the buffer is full and there is no current merge, - * schedule a merge and add a new empty buffer. If there - * is a current merge, then just add a new empty buffer + * If the buffer is full and there is no ongoing reconstruction, + * schedule a reconstruction and add a new empty buffer. If there + * is an ongoing reconstruction, then add a new empty buffer * to the current epoch. */ if (buffer->is_full()) { if constexpr (std::same_as) { - /* single threaded: run merge and then empty buffer */ + /* single threaded: run reconstruction and then empty buffer */ epoch->end_job(); - schedule_merge(); + schedule_reconstruction(); buffer->truncate(); continue; - } else if (epoch->prepare_merge()) { + } else if (epoch->prepare_reconstruction()) { /* * add an empty buffer to allow insert proceed and - * schedule a merge on a background thread + * schedule a reconstruction on a background thread */ buffer = add_empty_buffer(); - schedule_merge(); + schedule_reconstruction(); } else { - /* background merge is ongoing, so just add empty buffer */ + /* background reconstruction is ongoing, so just add empty buffer */ buffer = add_empty_buffer(); } } -- cgit v1.2.3 From 8113b32d124f487856a858c7f68a4e531399f66d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 21 Dec 2023 17:07:29 -0500 Subject: DynamicExtension: comments/reorganization Clarified the reasoning for a few things in comments that just tripped me up during debugging. --- include/framework/DynamicExtension.h | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index fe43c52..c5c4a1a 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -261,23 +261,29 @@ private: auto compactions = structure->get_compaction_tasks(); while (compactions.size() > 0) { - /* otherwise, we need to schedule a compaction */ + + /* schedule a compaction */ ReconstructionArgs *args = new ReconstructionArgs(); args->epoch = epoch; - // FIXME: all full buffers can be flushed at this point--but that requires - // retooling the shard interface a bit to do efficiently. args->merges = compactions; args->extension = this; args->compaction = true; + /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ auto wait = args->result.get_future(); + /* + * the reconstruction process calls end_job(), + * so we must start one before calling it + */ epoch->start_job(); + m_sched.schedule_job(reconstruction, 0, args); - /* wait for reconstruction completion */ + /* wait for compaction completion */ wait.get(); + /* get a new batch of compactions to perform, if needed */ compactions = structure->get_compaction_tasks(); } } @@ -557,17 +563,23 @@ private: } void schedule_reconstruction() { - //fprintf(stderr, "%ld\t Reconstruction Scheduling", m_current_epoch); auto epoch = create_new_epoch(); + /* + * the reconstruction process calls end_job(), + * so we must start one before calling it + */ epoch->start_job(); - ReconstructionArgs *args = new ReconstructionArgs(); - args->epoch = epoch; // FIXME: all full buffers can be flushed at this point--but that requires // retooling the shard interface a bit to do efficiently. + // + ReconstructionArgs *args = new ReconstructionArgs(); + args->epoch = epoch; args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffers()[0]->get_record_count()); args->extension = this; args->compaction = false; + /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ + m_sched.schedule_job(reconstruction, 0, args); } -- cgit v1.2.3 From aac0bb661af8fae38d3ce08d6078cb4d9dfcb575 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 12 Jan 2024 14:10:11 -0500 Subject: Initial integration of new buffering scheme into framework It isn't working right now (lotsa test failures), but we're to the debugging phase now. --- include/framework/DynamicExtension.h | 252 ++++++++++++----------------------- 1 file changed, 88 insertions(+), 164 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index c5c4a1a..c97b390 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -10,29 +10,23 @@ #pragma once #include -#include #include #include #include #include #include +#include "framework/interface/Scheduler.h" +#include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/SerialScheduler.h" + #include "framework/structure/MutableBuffer.h" -#include "framework/structure/InternalLevel.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" #include "framework/interface/Record.h" -#include "framework/interface/Query.h" -#include "framework/interface/Scheduler.h" #include "framework/structure/ExtensionStructure.h" #include "framework/util/Configuration.h" -#include "framework/scheduling/FIFOScheduler.h" -#include "framework/scheduling/SerialScheduler.h" #include "framework/scheduling/Epoch.h" -#include "psu-util/timer.h" -#include "psu-ds/Alias.h" namespace de { @@ -43,22 +37,19 @@ class DynamicExtension { typedef MutableBuffer Buffer; typedef ExtensionStructure Structure; typedef Epoch _Epoch; - typedef BufferView BufView; + typedef BufferView BufView; public: - DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop, size_t memory_budget=0, + DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0, size_t thread_cnt=16) : m_scale_factor(scale_factor) - , m_max_delete_prop(max_delete_prop) + , m_max_delete_prop(1) , m_sched(memory_budget, thread_cnt) - , m_buffer_capacity(buffer_cap) - , m_buffer_delete_capacity(max_delete_prop*buffer_cap) + , m_buffer(new Buffer(buffer_lwm, buffer_hwm)) { - auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); - auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop); - auto epoch = new _Epoch(0, vers, buf); + auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); + auto epoch = new _Epoch(0, vers, m_buffer); - m_buffers.insert(buf); m_versions.insert(vers); m_epochs.insert({0, epoch}); } @@ -79,9 +70,7 @@ public: delete e.second; } - for (auto e : m_buffers) { - delete e; - } + delete m_buffer; for (auto e : m_versions) { delete e; @@ -95,10 +84,15 @@ public: int erase(const R &rec) { // FIXME: delete tagging will require a lot of extra work to get // operating "correctly" in a concurrent environment. + + /* + * Get a view on the buffer *first*. This will ensure a stronger + * ordering than simply accessing the buffer directly, but is + * not *strictly* necessary. + */ + auto view = m_buffer->get_buffer_view(); if constexpr (D == DeletePolicy::TAGGING) { static_assert(std::same_as, "Tagging is only supported in single-threaded operation"); - BufView buffers = get_active_epoch()->get_buffer_view(); - if (get_active_epoch()->get_structure()->tagged_delete(rec)) { return 1; } @@ -108,7 +102,7 @@ public: * probably has the lowest probability of having the record, * so we'll check it last. */ - return buffers.delete_record(rec); + return view.delete_record(rec); } /* @@ -123,7 +117,7 @@ public: size_t get_record_count() { auto epoch = get_active_epoch_protected(); - auto t = epoch->get_buffer_view().get_record_count() + epoch->get_structure()->get_record_count(); + auto t = epoch->get_buffer().get_record_count() + epoch->get_structure()->get_record_count(); epoch->end_job(); return t; @@ -131,7 +125,7 @@ public: size_t get_tombstone_count() { auto epoch = get_active_epoch_protected(); - auto t = epoch->get_buffer_view().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); + auto t = epoch->get_buffer().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); epoch->end_job(); return t; @@ -147,7 +141,7 @@ public: size_t get_memory_usage() { auto epoch = get_active_epoch_protected(); - auto t= epoch->get_buffer_view().get_memory_usage() + epoch->get_structure()->get_memory_usage(); + auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage(); epoch->end_job(); return t; @@ -155,14 +149,14 @@ public: size_t get_aux_memory_usage() { auto epoch = get_active_epoch_protected(); - auto t = epoch->get_buffer_view().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); + auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); epoch->end_job(); return t; } size_t get_buffer_capacity() { - return m_buffer_capacity; + return m_buffer->get_capacity(); } Shard *create_static_structure(bool await_reconstruction_completion=false) { @@ -171,11 +165,20 @@ public: } auto epoch = get_active_epoch_protected(); - auto bv = epoch->get_buffer_view(); - auto vers = epoch->get_structure(); std::vector shards; + /* + * construct a shard from the buffer view. We'll hold the view + * for as short a time as possible: once the records are exfiltrated + * from the buffer, there's no reason to retain a hold on the view's + * head pointer any longer + */ + { + auto bv = epoch->get_buffer(); + shards.emplace_back(new S(std::move(bv))); + } + if (vers->get_levels().size() > 0) { for (int i=vers->get_levels().size() - 1; i>= 0; i--) { if (vers->get_levels()[i]) { @@ -184,12 +187,6 @@ public: } } - // FIXME: With an interface adjustment, this could be done in - // one call, rather than a loop. - for (ssize_t i=bv.size() - 1; i>=0; i--) { - shards.emplace_back(new S(bv.get_buffers()[i])); - } - Shard *shards_array[shards.size()]; size_t j = 0; @@ -237,10 +234,13 @@ public: private: SCHED m_sched; + Buffer *m_buffer; + std::mutex m_struct_lock; - std::set m_buffers; std::set m_versions; + alignas(64) std::atomic m_reconstruction_scheduled; + std::atomic m_current_epoch; std::atomic m_newest_epoch; std::unordered_map m_epochs; @@ -253,8 +253,6 @@ private: size_t m_scale_factor; double m_max_delete_prop; - size_t m_buffer_capacity; - size_t m_buffer_delete_capacity; void enforce_delete_invariant(_Epoch *epoch) { auto structure = epoch->get_structure(); @@ -321,6 +319,7 @@ private: */ enforce_delete_invariant(new_epoch); + #if 0 /* * Update the new Epoch to contain the buffers from the old one * that it doesn't currently have if using a multi-threaded @@ -339,6 +338,7 @@ private: new_epoch->add_buffer(old_epoch->get_buffers()[i]); } } + #endif m_current_epoch.fetch_add(1); old_epoch->set_inactive(); @@ -373,57 +373,6 @@ private: return new_epoch; } - /* - * Add a new empty buffer. This is intended to be used - * when a reconstruction is triggered, to allow for inserts to be sustained in the new - * buffer while a new epoch is being created in the background. Returns a - * pointer to the newly created buffer. - */ - Buffer *add_empty_buffer() { - /* - * if there's a current Epoch transition ongoing, a buffer installed - * into an older Epoch, but not the new one, may be lost. So fail to - * insert a buffer. - */ - if (!m_epoch_transition_lk.try_lock()) { - return nullptr; - } - - /* - * verify that the currently active buffer is still full, if - * not, there is no reason to add a new one. This code is - * protected by the epoch transition lock, so need need to - * take a protected reference to the epoch. - */ - auto active_epoch = get_active_epoch(); - if (!active_epoch->get_active_buffer()->is_full()) { - m_epoch_transition_lk.unlock(); - return nullptr; - } - - /* - * create a new buffer and install it in the active epoch. - */ - auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); - - std::unique_lock m_struct_lock; - auto new_buffer = active_epoch->add_buffer(temp_buffer); - - /* - * if epoch->add_buffer doesn't add the new buffer, this insert - * won't update the buffer set (duplicate insert) - */ - m_buffers.insert(new_buffer); - m_struct_lock.release(); - - if (new_buffer != temp_buffer) { - delete temp_buffer; - } - m_epoch_transition_lk.unlock(); - - return new_buffer; - } - void retire_epoch(_Epoch *epoch) { /* * Epochs with currently active jobs cannot @@ -452,21 +401,14 @@ private: delete epoch; m_epoch_retire_lk.unlock(); + /* NOTE: the BufferView mechanism handles freeing unused buffer space */ + /* * Following the epoch's destruction, any buffers * or structures with no remaining references can * be safely freed. */ std::unique_lock lock(m_struct_lock); - for (auto itr = m_buffers.begin(); itr != m_buffers.end();) { - if ((*itr)->get_reference_count() == 0) { - auto tmp = *itr; - itr = m_buffers.erase(itr); - delete tmp; - } else { - itr++; - } - } for (auto itr = m_versions.begin(); itr != m_versions.end();) { if ((*itr)->get_reference_count() == 0) { @@ -484,21 +426,31 @@ private: Structure *vers = args->epoch->get_structure(); - // FIXME: with an improved shard interface, multiple full buffers + // FIXME: with an improved shard interface, multiple full buffer_viewers // could be flushed at once here. - Buffer *buff = (Buffer *) args->epoch->get_buffers()[0]; + auto buffer_view = args->epoch->get_buffer(); + size_t new_head = buffer_view.get_tail(); for (ssize_t i=0; imerges.size(); i++) { vers->reconstruction(args->merges[i].second, args->merges[i].first); } /* - * if performing a compaction, don't push the buffer down, - * as there is no guarantee that any necessary reconstructions + * if performing a compaction, don't flush the buffer, as + * there is no guarantee that any necessary reconstructions * will free sufficient space in L0 to support a flush */ if (!args->compaction) { - vers->flush_buffer(buff); + vers->flush_buffer(std::move(buffer_view)); + + // FIXME: this may currently fail because there isn't any + // query preemption yet. At this point, we'd need to either + // 1) wait for all queries on the old_head to finish + // 2) kill all queries on the old_head + // 3) somehow migrate all queries on the old_head to the new + // version + auto res = args->epoch->advance_buffer_head(new_head); + assert(res); } args->epoch->end_job(); @@ -519,27 +471,33 @@ private: static void async_query(void *arguments) { QueryArgs *args = (QueryArgs *) arguments; - auto buffers = args->epoch->get_buffer_view(); + auto buffer = args->epoch->get_buffer(); auto vers = args->epoch->get_structure(); void *parms = args->query_parms; /* Get the buffer query states */ - std::vector buffer_states = buffers.get_query_states(parms); + void *buffer_state = Q::get_buffer_query_state(std::move(buffer), parms); /* Get the shard query states */ std::vector> shards; std::vector states = vers->get_query_states(shards, parms); - Q::process_query_states(parms, states, buffer_states); + Q::process_query_states(parms, states, buffer_state); - std::vector>> query_results(shards.size() + buffer_states.size()); + std::vector>> query_results(shards.size() + 1); for (size_t i=0; i> local_results = (i < buffer_states.size()) - ? Q::buffer_query(buffers.get_buffers()[i], buffer_states[i], parms) - : Q::query(shards[i - buffer_states.size()].second, - states[i - buffer_states.size()], parms); - ShardID shid = (i < buffer_states.size()) ? INVALID_SHID : shards[i - buffer_states.size()].first; - query_results[i] = std::move(filter_deletes(local_results, shid, buffers, vers)); + std::vector> local_results; + ShardID shid; + + if (i == 0) { /* process the buffer first */ + local_results = Q::buffer_query(buffer_state, parms); + shid = INVALID_SHID; + } else { + local_results = Q::query(shards[i - 1].second, states[i - 1], parms); + shid = shards[i - 1].first; + } + + query_results[i] = std::move(filter_deletes(local_results, shid, vers)); if constexpr (Q::EARLY_ABORT) { if (query_results[i].size() > 0) break; @@ -551,10 +509,7 @@ private: args->epoch->end_job(); - for (size_t i=0; i *args = new ReconstructionArgs(); args->epoch = epoch; - args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffers()[0]->get_record_count()); + args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffer().get_record_count()); args->extension = this; args->compaction = false; /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ @@ -597,49 +552,16 @@ private: } int internal_append(const R &rec, bool ts) { - Buffer *buffer = nullptr; - int res = 0; - do { - auto epoch = get_active_epoch_protected(); - buffer = epoch->get_active_buffer(); - assert(buffer); - - /* - * If the buffer is full and there is no ongoing reconstruction, - * schedule a reconstruction and add a new empty buffer. If there - * is an ongoing reconstruction, then add a new empty buffer - * to the current epoch. - */ - if (buffer->is_full()) { - if constexpr (std::same_as) { - /* single threaded: run reconstruction and then empty buffer */ - epoch->end_job(); - schedule_reconstruction(); - buffer->truncate(); - continue; - } else if (epoch->prepare_reconstruction()) { - /* - * add an empty buffer to allow insert proceed and - * schedule a reconstruction on a background thread - */ - buffer = add_empty_buffer(); - schedule_reconstruction(); - } else { - /* background reconstruction is ongoing, so just add empty buffer */ - buffer = add_empty_buffer(); - } - } - - res = (buffer) ? buffer->append(rec, ts) : 0; - epoch->end_job(); - } while(!res); + if (!m_reconstruction_scheduled.load() && m_buffer->is_at_low_watermark()) { + m_reconstruction_scheduled.store(true); + schedule_reconstruction(); + } - /* internal append should always succeed, eventually */ - return 1; + /* this will fail if the HWM is reached and return 0 */ + return m_buffer->append(rec, ts); } - static std::vector> filter_deletes(std::vector> &records, ShardID shid, - BufView &buffers, Structure *vers) { + static std::vector> filter_deletes(std::vector> &records, ShardID shid, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -672,9 +594,11 @@ private: continue; } - if (buffers.check_tombstone(rec.rec)) { - continue; - } + // FIXME: need to figure out how best to re-enable the buffer tombstone + // check in the correct manner. + //if (buffview.check_tombstone(rec.rec)) { + //continue; + //} if (shid != INVALID_SHID) { for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { -- cgit v1.2.3 From cf178ae74a76b780b655a447531d2114f9f81d98 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Mon, 15 Jan 2024 14:01:36 -0500 Subject: Various single-threaded bug fixes --- include/framework/DynamicExtension.h | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index c97b390..bddc950 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -90,8 +90,8 @@ public: * ordering than simply accessing the buffer directly, but is * not *strictly* necessary. */ - auto view = m_buffer->get_buffer_view(); if constexpr (D == DeletePolicy::TAGGING) { + auto view = m_buffer->get_buffer_view(); static_assert(std::same_as, "Tagging is only supported in single-threaded operation"); if (get_active_epoch()->get_structure()->tagged_delete(rec)) { return 1; @@ -426,9 +426,8 @@ private: Structure *vers = args->epoch->get_structure(); - // FIXME: with an improved shard interface, multiple full buffer_viewers // could be flushed at once here. - auto buffer_view = args->epoch->get_buffer(); + auto buffer_view = args->epoch->get_flush_buffer(); size_t new_head = buffer_view.get_tail(); for (ssize_t i=0; imerges.size(); i++) { @@ -464,6 +463,8 @@ private: if (!args->compaction) { ((DynamicExtension *) args->extension)->advance_epoch(); } + + ((DynamicExtension *) args->extension)->m_reconstruction_scheduled = false; delete args; } @@ -525,12 +526,9 @@ private: */ epoch->start_job(); - // FIXME: all full buffers can be flushed at this point--but that requires - // retooling the shard interface a bit to do efficiently. - // ReconstructionArgs *args = new ReconstructionArgs(); args->epoch = epoch; - args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffer().get_record_count()); + args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_low_watermark()); args->extension = this; args->compaction = false; /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ -- cgit v1.2.3 From 2117935e85412f3733ee0bcb1830c7fd0b129b29 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Mon, 15 Jan 2024 17:23:57 -0500 Subject: Concurrency testing and bug fixes --- include/framework/DynamicExtension.h | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index bddc950..cb21ae3 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -28,6 +28,7 @@ #include "framework/scheduling/Epoch.h" + namespace de { template get_buffer(); - shards.emplace_back(new S(std::move(bv))); + if (bv.get_record_count() > 0) { + shards.emplace_back(new S(std::move(bv))); + } } if (vers->get_levels().size() > 0) { for (int i=vers->get_levels().size() - 1; i>= 0; i--) { - if (vers->get_levels()[i]) { + if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) { shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); } } @@ -426,14 +429,20 @@ private: Structure *vers = args->epoch->get_structure(); - // could be flushed at once here. - auto buffer_view = args->epoch->get_flush_buffer(); - size_t new_head = buffer_view.get_tail(); for (ssize_t i=0; imerges.size(); i++) { vers->reconstruction(args->merges[i].second, args->merges[i].first); } + /* + * we'll grab the buffer AFTER doing the internal reconstruction, so we can + * flush as many records as possible in one go. The reconstruction was done so + * as to make room for the full buffer anyway, so there's no real benefit to doing + * this first. + */ + auto buffer_view = args->epoch->get_buffer(); + size_t new_head = buffer_view.get_tail(); + /* * if performing a compaction, don't flush the buffer, as * there is no guarantee that any necessary reconstructions @@ -528,7 +537,7 @@ private: ReconstructionArgs *args = new ReconstructionArgs(); args->epoch = epoch; - args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_low_watermark()); + args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_high_watermark()); args->extension = this; args->compaction = false; /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ -- cgit v1.2.3 From 138c793b0a58577713d98c98bb140cf1d9c79bee Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 17 Jan 2024 18:22:00 -0500 Subject: Multiple concurrency bug fixes A poorly organized commit with fixes for a variety of bugs that were causing missing records. The core problems all appear to be fixed, though there is an outstanding problem with tombstones not being completely canceled. A very small number are appearing in the wrong order during the static structure test. --- include/framework/DynamicExtension.h | 91 ++++++++++++------------------------ 1 file changed, 30 insertions(+), 61 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index cb21ae3..7590de2 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -49,7 +49,7 @@ public: , m_buffer(new Buffer(buffer_lwm, buffer_hwm)) { auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); - auto epoch = new _Epoch(0, vers, m_buffer); + auto epoch = new _Epoch(0, vers, m_buffer, 0); m_versions.insert(vers); m_epochs.insert({0, epoch}); @@ -169,6 +169,15 @@ public: auto vers = epoch->get_structure(); std::vector shards; + + if (vers->get_levels().size() > 0) { + for (int i=vers->get_levels().size() - 1; i>= 0; i--) { + if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) { + shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); + } + } + } + /* * construct a shard from the buffer view. We'll hold the view * for as short a time as possible: once the records are exfiltrated @@ -182,24 +191,7 @@ public: } } - if (vers->get_levels().size() > 0) { - for (int i=vers->get_levels().size() - 1; i>= 0; i--) { - if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) { - shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); - } - } - } - - Shard *shards_array[shards.size()]; - - size_t j = 0; - for (size_t i=0; i) { - size_t old_buffer_cnt = new_epoch->clear_buffers(); - - /* - * skip the first buffer, as this was flushed into the epoch's - * structure already, and copy all the other buffer references - * into the new epoch - */ - for (size_t i=1; iget_buffers().size(); i++) { - new_epoch->add_buffer(old_epoch->get_buffers()[i]); - } - } - #endif + // FIXME: this may currently fail because there isn't any + // query preemption yet. At this point, we'd need to either + // 1) wait for all queries on the old_head to finish + // 2) kill all queries on the old_head + // 3) somehow migrate all queries on the old_head to the new + // version + auto res = new_epoch->advance_buffer_head(buffer_head); + assert(res); m_current_epoch.fetch_add(1); old_epoch->set_inactive(); @@ -425,40 +405,29 @@ private: } static void reconstruction(void *arguments) { - ReconstructionArgs *args = (ReconstructionArgs *) arguments; - + auto args = (ReconstructionArgs *) arguments; Structure *vers = args->epoch->get_structure(); - for (ssize_t i=0; imerges.size(); i++) { vers->reconstruction(args->merges[i].second, args->merges[i].first); } - /* - * we'll grab the buffer AFTER doing the internal reconstruction, so we can - * flush as many records as possible in one go. The reconstruction was done so - * as to make room for the full buffer anyway, so there's no real benefit to doing - * this first. + /* + * we'll grab the buffer AFTER doing the internal reconstruction, so we + * can flush as many records as possible in one go. The reconstruction + * was done so as to make room for the full buffer anyway, so there's + * no real benefit to doing this first. */ auto buffer_view = args->epoch->get_buffer(); size_t new_head = buffer_view.get_tail(); - /* - * if performing a compaction, don't flush the buffer, as - * there is no guarantee that any necessary reconstructions + /* + * if performing a compaction, don't flush the buffer, as + * there is no guarantee that any necessary reconstructions * will free sufficient space in L0 to support a flush */ if (!args->compaction) { vers->flush_buffer(std::move(buffer_view)); - - // FIXME: this may currently fail because there isn't any - // query preemption yet. At this point, we'd need to either - // 1) wait for all queries on the old_head to finish - // 2) kill all queries on the old_head - // 3) somehow migrate all queries on the old_head to the new - // version - auto res = args->epoch->advance_buffer_head(new_head); - assert(res); } args->epoch->end_job(); @@ -470,7 +439,7 @@ private: * part of a compaction */ if (!args->compaction) { - ((DynamicExtension *) args->extension)->advance_epoch(); + ((DynamicExtension *) args->extension)->advance_epoch(new_head); } ((DynamicExtension *) args->extension)->m_reconstruction_scheduled = false; -- cgit v1.2.3 From 38693c342558628c75e0ab0d23c32a95a499ed8b Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 19 Jan 2024 15:58:04 -0500 Subject: Initial rough-out of internal statistics tracker Need to figure out the best way to do the detailed tracking in a concurrent manner. I was thinking just an event log, with parsing routines for extracting statistics. But that'll be pretty slow. --- include/framework/DynamicExtension.h | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 7590de2..89ee30f 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -40,6 +40,10 @@ class DynamicExtension { typedef Epoch _Epoch; typedef BufferView BufView; + static constexpr size_t QUERY = 1; + static constexpr size_t RECONSTRUCTION = 2; + + public: DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0, size_t thread_cnt=16) @@ -226,6 +230,11 @@ public: return t; } + + void print_scheduler_statistics() { + m_sched.print_statistics(); + } + private: SCHED m_sched; @@ -271,7 +280,7 @@ private: */ epoch->start_job(); - m_sched.schedule_job(reconstruction, 0, args); + m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); /* wait for compaction completion */ wait.get(); @@ -511,7 +520,7 @@ private: args->compaction = false; /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ - m_sched.schedule_job(reconstruction, 0, args); + m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); } std::future> schedule_query(void *query_parms) { @@ -522,7 +531,7 @@ private: args->query_parms = query_parms; auto result = args->result_set.get_future(); - m_sched.schedule_job(async_query, 0, args); + m_sched.schedule_job(async_query, 0, args, QUERY); return result; } -- cgit v1.2.3 From f0a55f7996e9ea2c7824fd5ab136b7c1864bbcdd Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 24 Jan 2024 11:18:25 -0500 Subject: DynamicExtension: Fixed reconstruction trigger data race Tweak the reconstruction trigger code to ensure that multiple reconstructions won't be triggered at the same time. --- include/framework/DynamicExtension.h | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 89ee30f..40f137c 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -451,7 +451,7 @@ private: ((DynamicExtension *) args->extension)->advance_epoch(new_head); } - ((DynamicExtension *) args->extension)->m_reconstruction_scheduled = false; + ((DynamicExtension *) args->extension)->m_reconstruction_scheduled.store(false); delete args; } @@ -537,9 +537,12 @@ private: } int internal_append(const R &rec, bool ts) { - if (!m_reconstruction_scheduled.load() && m_buffer->is_at_low_watermark()) { - m_reconstruction_scheduled.store(true); - schedule_reconstruction(); + if (m_buffer->is_at_low_watermark()) { + auto old = false; + + if (m_reconstruction_scheduled.compare_exchange_strong(old, true)) { + schedule_reconstruction(); + } } /* this will fail if the HWM is reached and return 0 */ -- cgit v1.2.3 From d166465dcca3550cb8f3263e0f5b5189a69d531a Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 31 Jan 2024 13:29:49 -0500 Subject: Temporary thread affinity for reconstruction --- include/framework/DynamicExtension.h | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 40f137c..3203945 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -32,7 +32,7 @@ namespace de { template + DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler> class DynamicExtension { typedef S Shard; typedef MutableBuffer Buffer; @@ -51,6 +51,8 @@ public: , m_max_delete_prop(1) , m_sched(memory_budget, thread_cnt) , m_buffer(new Buffer(buffer_lwm, buffer_hwm)) + , m_core_cnt(thread_cnt) + , m_next_core(0) { auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); auto epoch = new _Epoch(0, vers, m_buffer, 0); @@ -258,6 +260,9 @@ private: size_t m_scale_factor; double m_max_delete_prop; + std::atomic m_next_core; + size_t m_core_cnt; + void enforce_delete_invariant(_Epoch *epoch) { auto structure = epoch->get_structure(); auto compactions = structure->get_compaction_tasks(); @@ -415,6 +420,8 @@ private: static void reconstruction(void *arguments) { auto args = (ReconstructionArgs *) arguments; + + ((DynamicExtension *) args->extension)->SetThreadAffinity(); Structure *vers = args->epoch->get_structure(); for (ssize_t i=0; imerges.size(); i++) { @@ -605,6 +612,30 @@ private: return processed_records; } + + void SetThreadAffinity() { + int core = m_next_core.fetch_add(1) % m_core_cnt; + cpu_set_t mask; + CPU_ZERO(&mask); + + switch (core % 2) { + case 0: + // 0 |-> 0 + // 2 |-> 2 + // 4 |-> 4 + core = core; + break; + case 1: + // 1 |-> 28 + // 3 |-> 30 + // 5 |-> 32 + core = (core - 1) + m_core_cnt; + break; + } + CPU_SET(core, &mask); + ::sched_setaffinity(0, sizeof(mask), &mask); + } + }; } -- cgit v1.2.3 From b1f966353695a0e06948df5332acccb84bbbcda0 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 31 Jan 2024 14:26:34 -0500 Subject: Query/Insert intermix benchmarks --- include/framework/DynamicExtension.h | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 3203945..a10831e 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -328,14 +328,15 @@ private: */ enforce_delete_invariant(new_epoch); - // FIXME: this may currently fail because there isn't any + // FIXME: this may currently block because there isn't any // query preemption yet. At this point, we'd need to either // 1) wait for all queries on the old_head to finish // 2) kill all queries on the old_head // 3) somehow migrate all queries on the old_head to the new // version - auto res = new_epoch->advance_buffer_head(buffer_head); - assert(res); + while (!new_epoch->advance_buffer_head(buffer_head)) { + _mm_pause(); + } m_current_epoch.fetch_add(1); old_epoch->set_inactive(); -- cgit v1.2.3 From e4a7d2d5c7464fe97ab7e37c2b0b73c32b5e8b17 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 31 Jan 2024 16:00:12 -0500 Subject: Possibly fixed epoch retirement sync error --- include/framework/DynamicExtension.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index a10831e..abe3839 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -384,11 +384,15 @@ private: do { if (epoch->retirable()) { + m_epoch_retire_lk.lock(); + if (!epoch->retirable()) { + m_epoch_retire_lk.unlock(); + continue; + } break; } } while (true); - m_epoch_retire_lk.lock(); /* remove epoch from the framework's map */ m_epochs.erase(epoch->get_epoch_number()); -- cgit v1.2.3 From 8fbcfda7270ef266f29f36b8eadcffaec2123612 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 31 Jan 2024 17:02:20 -0500 Subject: More locking! --- include/framework/DynamicExtension.h | 2 ++ 1 file changed, 2 insertions(+) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index abe3839..cc226d2 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -301,8 +301,10 @@ private: _Epoch *get_active_epoch_protected() { m_epoch_retire_lk.lock_shared(); + m_struct_lock.lock(); auto cur_epoch = m_current_epoch.load(); m_epochs[cur_epoch]->start_job(); + m_struct_lock.unlock(); m_epoch_retire_lk.unlock_shared(); return m_epochs[cur_epoch]; -- cgit v1.2.3 From f3b7428cfa7f9364c5a8bc85107db3a7cccd53bc Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 31 Jan 2024 18:41:17 -0500 Subject: Adjusted epoch transition methodology --- include/framework/DynamicExtension.h | 215 +++++++++++++++++++---------------- 1 file changed, 119 insertions(+), 96 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index cc226d2..0992e14 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -43,6 +43,10 @@ class DynamicExtension { static constexpr size_t QUERY = 1; static constexpr size_t RECONSTRUCTION = 2; + struct epoch_ptr { + _Epoch *epoch; + size_t refcnt; + }; public: DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0, @@ -53,12 +57,14 @@ public: , m_buffer(new Buffer(buffer_lwm, buffer_hwm)) , m_core_cnt(thread_cnt) , m_next_core(0) + , m_epoch_cnt(0) { auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); - auto epoch = new _Epoch(0, vers, m_buffer, 0); + m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); + m_previous_epoch.store({nullptr, 0}); + m_next_epoch.store({nullptr, 0}); m_versions.insert(vers); - m_epochs.insert({0, epoch}); } ~DynamicExtension() { @@ -66,16 +72,13 @@ public: /* let any in-flight epoch transition finish */ await_next_epoch(); - /* deactivate the active epoch */ - get_active_epoch()->set_inactive(); - /* shutdown the scheduler */ m_sched.shutdown(); /* delete all held resources */ - for (auto e : m_epochs) { - delete e.second; - } + delete m_next_epoch.load().epoch; + delete m_current_epoch.load().epoch; + delete m_previous_epoch.load().epoch; delete m_buffer; @@ -123,41 +126,41 @@ public: } size_t get_record_count() { - auto epoch = get_active_epoch_protected(); + auto epoch = get_active_epoch(); auto t = epoch->get_buffer().get_record_count() + epoch->get_structure()->get_record_count(); - epoch->end_job(); + end_job(epoch); return t; } size_t get_tombstone_count() { - auto epoch = get_active_epoch_protected(); + auto epoch = get_active_epoch(); auto t = epoch->get_buffer().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); - epoch->end_job(); + end_job(epoch); return t; } size_t get_height() { - auto epoch = get_active_epoch_protected(); + auto epoch = get_active_epoch(); auto t = epoch->get_structure()->get_height(); - epoch->end_job(); + end_job(epoch); return t; } size_t get_memory_usage() { - auto epoch = get_active_epoch_protected(); + auto epoch = get_active_epoch(); auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage(); - epoch->end_job(); + end_job(epoch); return t; } size_t get_aux_memory_usage() { - auto epoch = get_active_epoch_protected(); + auto epoch = get_active_epoch(); auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); - epoch->end_job(); + end_job(epoch); return t; } @@ -171,7 +174,7 @@ public: await_next_epoch(); } - auto epoch = get_active_epoch_protected(); + auto epoch = get_active_epoch(); auto vers = epoch->get_structure(); std::vector shards; @@ -203,7 +206,7 @@ public: delete shard; } - epoch->end_job(); + end_job(epoch); return flattened; } @@ -212,12 +215,10 @@ public: * the newest one to become available. Otherwise, returns immediately. */ void await_next_epoch() { - while (m_current_epoch.load() != m_newest_epoch.load()) { + while (m_next_epoch.load().epoch != nullptr) { std::unique_lock lk(m_epoch_cv_lk); m_epoch_cv.wait(lk); } - - return; } /* @@ -226,9 +227,9 @@ public: * tombstone proportion invariant. */ bool validate_tombstone_proportion() { - auto epoch = get_active_epoch_protected(); + auto epoch = get_active_epoch(); auto t = epoch->get_structure()->validate_tombstone_proportion(); - epoch->end_job(); + end_job(epoch); return t; } @@ -247,15 +248,14 @@ private: alignas(64) std::atomic m_reconstruction_scheduled; - std::atomic m_current_epoch; - std::atomic m_newest_epoch; - std::unordered_map m_epochs; + std::atomic m_next_epoch; + std::atomic m_current_epoch; + std::atomic m_previous_epoch; std::condition_variable m_epoch_cv; std::mutex m_epoch_cv_lk; - std::mutex m_epoch_transition_lk; - std::shared_mutex m_epoch_retire_lk; + std::atomic m_epoch_cnt; size_t m_scale_factor; double m_max_delete_prop; @@ -279,12 +279,6 @@ private: auto wait = args->result.get_future(); - /* - * the reconstruction process calls end_job(), - * so we must start one before calling it - */ - epoch->start_job(); - m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); /* wait for compaction completion */ @@ -296,39 +290,38 @@ private: } _Epoch *get_active_epoch() { - return m_epochs[m_current_epoch.load()]; - } + epoch_ptr old, new_ptr; - _Epoch *get_active_epoch_protected() { - m_epoch_retire_lk.lock_shared(); - m_struct_lock.lock(); - auto cur_epoch = m_current_epoch.load(); - m_epochs[cur_epoch]->start_job(); - m_struct_lock.unlock(); - m_epoch_retire_lk.unlock_shared(); + do { + if (m_current_epoch.load().epoch == nullptr) { + old = m_previous_epoch; + new_ptr = {old.epoch, old.refcnt+1}; + if (old.epoch != nullptr && m_previous_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } else { + old = m_current_epoch; + new_ptr = {old.epoch, old.refcnt+1}; + if (old.epoch != nullptr && m_current_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } + } while (true); - return m_epochs[cur_epoch]; + return new_ptr.epoch; } void advance_epoch(size_t buffer_head) { - m_epoch_transition_lk.lock(); - - size_t new_epoch_num = m_newest_epoch.load(); - size_t old_epoch_num = m_current_epoch.load(); - assert(new_epoch_num != old_epoch_num); + retire_epoch(m_previous_epoch.load().epoch); - _Epoch *new_epoch = m_epochs[new_epoch_num]; - _Epoch *old_epoch = m_epochs[old_epoch_num]; + epoch_ptr tmp = {nullptr, 0}; + epoch_ptr cur; + do { + cur = m_current_epoch; + } while(!m_current_epoch.compare_exchange_strong(cur, tmp)); - /* - * Verify the tombstone invariant within the epoch's structure, this - * may require scheduling additional reconstructions. - * - * FIXME: having this inside the lock is going to TANK - * insertion performance. - */ - enforce_delete_invariant(new_epoch); + m_previous_epoch.store(cur); // FIXME: this may currently block because there isn't any // query preemption yet. At this point, we'd need to either @@ -336,20 +329,19 @@ private: // 2) kill all queries on the old_head // 3) somehow migrate all queries on the old_head to the new // version - while (!new_epoch->advance_buffer_head(buffer_head)) { + while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) { _mm_pause(); } - m_current_epoch.fetch_add(1); - old_epoch->set_inactive(); - m_epoch_transition_lk.unlock(); + + m_current_epoch.store(m_next_epoch); + m_next_epoch.store({nullptr, 0}); + /* notify any blocking threads that the new epoch is available */ m_epoch_cv_lk.lock(); m_epoch_cv.notify_all(); m_epoch_cv_lk.unlock(); - - retire_epoch(old_epoch); } /* @@ -363,14 +355,20 @@ private: * is violated, it is possible that this code will clone a retired * epoch. */ - m_newest_epoch.fetch_add(1); - auto new_epoch = get_active_epoch()->clone(m_newest_epoch.load()); + assert(m_next_epoch.load().epoch == nullptr); + auto current_epoch = get_active_epoch(); + + m_epoch_cnt.fetch_add(1); + m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0}); + + end_job(current_epoch); + std::unique_lock m_struct_lock; - m_versions.insert(new_epoch->get_structure()); - m_epochs.insert({m_newest_epoch.load(), new_epoch}); + m_versions.insert(m_next_epoch.load().epoch->get_structure()); m_struct_lock.release(); - return new_epoch; + + return m_next_epoch.load().epoch; } void retire_epoch(_Epoch *epoch) { @@ -384,28 +382,25 @@ private: * proceed. */ + if (epoch == nullptr) { + return; + } + + epoch_ptr old, new_ptr; + new_ptr = {nullptr, 0}; do { - if (epoch->retirable()) { - m_epoch_retire_lk.lock(); - if (!epoch->retirable()) { - m_epoch_retire_lk.unlock(); - continue; - } + old = m_previous_epoch.load(); + + if (old.epoch == epoch && old.refcnt == 0 && + m_previous_epoch.compare_exchange_strong(old, new_ptr)) { break; } - } while (true); + usleep(1); + } while(true); - /* remove epoch from the framework's map */ - m_epochs.erase(epoch->get_epoch_number()); + //fprintf(stderr, "Epoch %ld retired [%p]\n", epoch->get_epoch_number(), epoch); - /* - * The epoch's destructor will handle releasing - * all the references it holds - */ delete epoch; - m_epoch_retire_lk.unlock(); - - /* NOTE: the BufferView mechanism handles freeing unused buffer space */ /* * Following the epoch's destruction, any buffers @@ -453,7 +448,6 @@ private: vers->flush_buffer(std::move(buffer_view)); } - args->epoch->end_job(); args->result.set_value(true); /* @@ -473,8 +467,17 @@ private: static void async_query(void *arguments) { QueryArgs *args = (QueryArgs *) arguments; - auto buffer = args->epoch->get_buffer(); - auto vers = args->epoch->get_structure(); + auto epoch = ((DynamicExtension *) args->extension)->get_active_epoch(); + + auto ptr1 = ((DynamicExtension *) args->extension)->m_previous_epoch.load().epoch; + auto ptr2 = ((DynamicExtension *) args->extension)->m_current_epoch.load().epoch; + auto ptr3 = ((DynamicExtension *) args->extension)->m_next_epoch.load().epoch; + + //fprintf(stderr, "(%ld, %p)\t%p\t%p\t%p\n", epoch->get_epoch_number(), epoch, ptr1, ptr2, ptr3); + + + auto buffer = epoch->get_buffer(); + auto vers = epoch->get_structure(); void *parms = args->query_parms; /* Get the buffer query states */ @@ -509,7 +512,7 @@ private: auto result = Q::merge(query_results, parms); args->result_set.set_value(std::move(result)); - args->epoch->end_job(); + ((DynamicExtension *) args->extension)->end_job(epoch); Q::delete_buffer_query_state(buffer_state); for (size_t i=0; istart_job(); ReconstructionArgs *args = new ReconstructionArgs(); args->epoch = epoch; @@ -538,10 +540,8 @@ private: } std::future> schedule_query(void *query_parms) { - auto epoch = get_active_epoch_protected(); - QueryArgs *args = new QueryArgs(); - args->epoch = epoch; + args->extension = this; args->query_parms = query_parms; auto result = args->result_set.get_future(); @@ -643,6 +643,29 @@ private: ::sched_setaffinity(0, sizeof(mask), &mask); } + + void end_job(_Epoch *epoch) { + epoch_ptr old, new_ptr; + + do { + if (m_previous_epoch.load().epoch == epoch) { + old = m_previous_epoch; + assert(old.refcnt > 0); + new_ptr = {old.epoch, old.refcnt - 1}; + if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } else { + old = m_current_epoch; + assert(old.refcnt > 0); + new_ptr = {old.epoch, old.refcnt - 1}; + if (m_current_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } + } while (true); + } + }; } -- cgit v1.2.3 From f7f61d6d5367f2984cbf40c3cd6d85f75cd999af Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 31 Jan 2024 20:34:35 -0500 Subject: temporary hack to get working --- include/framework/DynamicExtension.h | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 0992e14..a56cc6c 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -388,6 +388,7 @@ private: epoch_ptr old, new_ptr; new_ptr = {nullptr, 0}; + size_t i=0; do { old = m_previous_epoch.load(); @@ -396,10 +397,11 @@ private: break; } usleep(1); + i++; + + if (i > 600) break; } while(true); - //fprintf(stderr, "Epoch %ld retired [%p]\n", epoch->get_epoch_number(), epoch); - delete epoch; /* @@ -473,8 +475,6 @@ private: auto ptr2 = ((DynamicExtension *) args->extension)->m_current_epoch.load().epoch; auto ptr3 = ((DynamicExtension *) args->extension)->m_next_epoch.load().epoch; - //fprintf(stderr, "(%ld, %p)\t%p\t%p\t%p\n", epoch->get_epoch_number(), epoch, ptr1, ptr2, ptr3); - auto buffer = epoch->get_buffer(); auto vers = epoch->get_structure(); @@ -650,14 +650,20 @@ private: do { if (m_previous_epoch.load().epoch == epoch) { old = m_previous_epoch; - assert(old.refcnt > 0); + if (old.refcnt <= 0) { + return; + } + new_ptr = {old.epoch, old.refcnt - 1}; if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) { break; } } else { old = m_current_epoch; - assert(old.refcnt > 0); + if (old.refcnt <= 0) { + return; + } + //assert(old.refcnt > 0); new_ptr = {old.epoch, old.refcnt - 1}; if (m_current_epoch.compare_exchange_strong(old, new_ptr)) { break; -- cgit v1.2.3 From db4806d9dd9757273a14e6c3ea92e5a087239145 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 5 Feb 2024 15:17:25 -0500 Subject: Set up tombstone deletes properly --- include/framework/DynamicExtension.h | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index a56cc6c..3e9d0fb 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -481,7 +481,7 @@ private: void *parms = args->query_parms; /* Get the buffer query states */ - void *buffer_state = Q::get_buffer_query_state(std::move(buffer), parms); + void *buffer_state = Q::get_buffer_query_state(&buffer, parms); /* Get the shard query states */ std::vector> shards; @@ -502,7 +502,7 @@ private: shid = shards[i - 1].first; } - query_results[i] = std::move(filter_deletes(local_results, shid, vers)); + query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); if constexpr (Q::EARLY_ABORT) { if (query_results[i].size() > 0) break; @@ -563,8 +563,8 @@ private: return m_buffer->append(rec, ts); } - static std::vector> filter_deletes(std::vector> &records, ShardID shid, Structure *vers) { - if constexpr (!Q::SKIP_DELETE_FILTER) { + static std::vector> filter_deletes(std::vector> &records, ShardID shid, Structure *vers, BufView *bview) { + if constexpr (Q::SKIP_DELETE_FILTER) { return records; } @@ -602,6 +602,12 @@ private: //continue; //} + for (size_t i=0; iget_record_count(); i++) { + if (bview->get(i)->is_tombstone() && bview->get(i)->rec == rec.rec) { + continue; + } + } + if (shid != INVALID_SHID) { for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { if (vers->get_levels()[lvl]->check_tombstone(0, rec.rec)) { -- cgit v1.2.3 From 10b4425e842d10b7fbfa85978969ed4591d6b98e Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 7 Feb 2024 10:56:52 -0500 Subject: Fully implemented Query concept and adjusted queries to use it --- include/framework/DynamicExtension.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 3e9d0fb..5c021f2 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -31,7 +31,7 @@ namespace de { -template Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler> class DynamicExtension { typedef S Shard; -- cgit v1.2.3 From 2c5d549b3618b9ea72e6eece4cb4f3da5a6811a8 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 7 Feb 2024 13:42:34 -0500 Subject: Fully realized shard concept interface --- include/framework/DynamicExtension.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 5c021f2..d88a945 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -31,7 +31,7 @@ namespace de { -template Q, LayoutPolicy L=LayoutPolicy::TEIRING, +template S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler> class DynamicExtension { typedef S Shard; -- cgit v1.2.3 From ded1f979d101a5df37a65370f6c18803212edb66 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 8 Feb 2024 12:40:13 -0500 Subject: Fixed a slight synchronization bug in Epoch retirement "properly" --- include/framework/DynamicExtension.h | 37 ++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index d88a945..e7dd774 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -293,6 +293,12 @@ private: epoch_ptr old, new_ptr; do { + /* + * during an epoch transition, a nullptr will installed in the + * current_epoch. At this moment, the "new" current epoch will + * soon be installed, but the "current" current epoch has been + * moved back to m_previous_epoch. + */ if (m_current_epoch.load().epoch == nullptr) { old = m_previous_epoch; new_ptr = {old.epoch, old.refcnt+1}; @@ -308,6 +314,8 @@ private: } } while (true); + assert(new_ptr.refcnt > 0); + return new_ptr.epoch; } @@ -388,7 +396,6 @@ private: epoch_ptr old, new_ptr; new_ptr = {nullptr, 0}; - size_t i=0; do { old = m_previous_epoch.load(); @@ -397,9 +404,7 @@ private: break; } usleep(1); - i++; - if (i > 600) break; } while(true); delete epoch; @@ -656,9 +661,15 @@ private: do { if (m_previous_epoch.load().epoch == epoch) { old = m_previous_epoch; - if (old.refcnt <= 0) { - return; - } + /* + * This could happen if we get into the system during a + * transition. In this case, we can just back out and retry + */ + if (old.epoch == nullptr) { + continue; + } + + assert(old.refcnt > 0); new_ptr = {old.epoch, old.refcnt - 1}; if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) { @@ -666,10 +677,16 @@ private: } } else { old = m_current_epoch; - if (old.refcnt <= 0) { - return; - } - //assert(old.refcnt > 0); + /* + * This could happen if we get into the system during a + * transition. In this case, we can just back out and retry + */ + if (old.epoch == nullptr) { + continue; + } + + assert(old.refcnt > 0); + new_ptr = {old.epoch, old.refcnt - 1}; if (m_current_epoch.compare_exchange_strong(old, new_ptr)) { break; -- cgit v1.2.3 From 402fc269c0aaa671d84a6d15918735ad4b90e6b2 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 9 Feb 2024 12:30:21 -0500 Subject: Comment updates/fixes --- include/framework/DynamicExtension.h | 73 +++++++++++++++++++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index e7dd774..473592d 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -13,7 +13,6 @@ #include #include #include -#include #include #include "framework/interface/Scheduler.h" @@ -87,10 +86,34 @@ public: } } + /* + * Insert the record `rec` into the index. If the buffer is full and + * the framework is blocking on an epoch transition, this call may fail + * and return 0. In this case, retry the call again later. If + * successful, 1 will be returned. The record will be immediately + * visible in the buffer upon the successful return of this function. + */ int insert(const R &rec) { return internal_append(rec, false); } + /* + * Erase the record `rec` from the index. It is assumed that `rec` + * currently exists--no special checks are made for correctness here. + * The behavior if this function will differ depending on if tombstone + * or tagged deletes are used. + * + * Tombstone deletes - inserts a tombstone record for `rec`. This *may* + * return 0 and fail if the buffer is full and the framework is + * blocking on an epoch transition. In this case, repeat the call + * later. 1 will be returned when the tombstone is successfully + * inserted. + * + * Tagging deletes - Does a point lookup for the record across the + * entire structure, and sets its delete bit when found. Returns 1 if + * the record is found and marked, and 0 if it was not (i.e., if it + * isn't present in the index). + */ int erase(const R &rec) { // FIXME: delete tagging will require a lot of extra work to get // operating "correctly" in a concurrent environment. @@ -121,10 +144,23 @@ public: return internal_append(rec, true); } + /* + * Execute the query with parameters `parms` and return a future. This + * future can be used to access a vector containing the results of the + * query. + * + * The behavior of this function is undefined if `parms` is not a + * pointer to a valid query parameter object for the query type used as + * a template parameter to construct the framework. + */ std::future> query(void *parms) { return schedule_query(parms); } + /* + * Returns the number of records (included tagged records and + * tombstones) currently within the framework. + */ size_t get_record_count() { auto epoch = get_active_epoch(); auto t = epoch->get_buffer().get_record_count() + epoch->get_structure()->get_record_count(); @@ -133,6 +169,11 @@ public: return t; } + /* + * Returns the number of tombstone records currently within the + * framework. This function can be called when tagged deletes are used, + * but will always return 0 in that case. + */ size_t get_tombstone_count() { auto epoch = get_active_epoch(); auto t = epoch->get_buffer().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); @@ -141,6 +182,12 @@ public: return t; } + /* + * Get the number of levels within the framework. This count will + * include any empty levels, but will not include the buffer. Note that + * this is *not* the same as the number of shards when tiering is used, + * as each level can contain multiple shards in that case. + */ size_t get_height() { auto epoch = get_active_epoch(); auto t = epoch->get_structure()->get_height(); @@ -149,6 +196,13 @@ public: return t; } + /* + * Get the number of bytes of memory allocated across the framework for + * storing records and associated index information (i.e., internal + * ISAM tree nodes). This includes memory that is allocated but + * currently unused in the buffer, or in shards themselves + * (overallocation due to delete cancellation, etc.). + */ size_t get_memory_usage() { auto epoch = get_active_epoch(); auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage(); @@ -157,6 +211,11 @@ public: return t; } + /* + * Get the number of bytes of memory allocated across the framework for + * auxiliary structures. This can include bloom filters, aux + * hashtables, etc. + */ size_t get_aux_memory_usage() { auto epoch = get_active_epoch(); auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); @@ -165,10 +224,22 @@ public: return t; } + /* + * Returns the maximum physical capacity of the buffer, measured in + * records. + */ size_t get_buffer_capacity() { return m_buffer->get_capacity(); } + /* + * Create a new single Shard object containing all of the records + * within the framework (buffer and shards). The optional parameter can + * be used to specify whether the Shard should be constructed with the + * currently active state of the framework (false), or if shard + * construction should wait until any ongoing reconstructions have + * finished and use that new version (true). + */ Shard *create_static_structure(bool await_reconstruction_completion=false) { if (await_reconstruction_completion) { await_next_epoch(); -- cgit v1.2.3 From aa1b40e9249afc03bf1a2f35de4cbf67c7f9b47e Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 9 Feb 2024 12:42:55 -0500 Subject: Framework: Fixed a bug where tagged deletes didn't release the epoch --- include/framework/DynamicExtension.h | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 473592d..238fc7f 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -124,12 +124,18 @@ public: * not *strictly* necessary. */ if constexpr (D == DeletePolicy::TAGGING) { - auto view = m_buffer->get_buffer_view(); static_assert(std::same_as, "Tagging is only supported in single-threaded operation"); - if (get_active_epoch()->get_structure()->tagged_delete(rec)) { + + auto view = m_buffer->get_buffer_view(); + + auto epoch = get_active_epoch(); + if (epoch->get_structure()->tagged_delete(rec)) { + end_job(epoch); return 1; } + end_job(epoch); + /* * the buffer will take the longest amount of time, and * probably has the lowest probability of having the record, @@ -470,6 +476,15 @@ private: do { old = m_previous_epoch.load(); + /* + * If running in single threaded mode, the failure to retire + * an Epoch will result in the thread of execution blocking + * indefinitely. + */ + if constexpr (std::same_as) { + if (old.epoch == epoch) assert(old.refcnt == 0); + } + if (old.epoch == epoch && old.refcnt == 0 && m_previous_epoch.compare_exchange_strong(old, new_ptr)) { break; -- cgit v1.2.3 From 3ddafd3b9ac089252814af87cb7d9fe534cf59a4 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 9 Feb 2024 13:09:05 -0500 Subject: Removed centralized version structure --- include/framework/DynamicExtension.h | 38 +++--------------------------------- 1 file changed, 3 insertions(+), 35 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 238fc7f..7ea5370 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -12,8 +12,6 @@ #include #include #include -#include -#include #include "framework/interface/Scheduler.h" #include "framework/scheduling/FIFOScheduler.h" @@ -26,12 +24,10 @@ #include "framework/util/Configuration.h" #include "framework/scheduling/Epoch.h" - - namespace de { template S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, - DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler> + DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=FIFOScheduler> class DynamicExtension { typedef S Shard; typedef MutableBuffer Buffer; @@ -62,8 +58,6 @@ public: m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); m_previous_epoch.store({nullptr, 0}); m_next_epoch.store({nullptr, 0}); - - m_versions.insert(vers); } ~DynamicExtension() { @@ -80,10 +74,6 @@ public: delete m_previous_epoch.load().epoch; delete m_buffer; - - for (auto e : m_versions) { - delete e; - } } /* @@ -320,8 +310,8 @@ private: Buffer *m_buffer; - std::mutex m_struct_lock; - std::set m_versions; + //std::mutex m_struct_lock; + //std::set m_versions; alignas(64) std::atomic m_reconstruction_scheduled; @@ -448,11 +438,6 @@ private: end_job(current_epoch); - std::unique_lock m_struct_lock; - m_versions.insert(m_next_epoch.load().epoch->get_structure()); - m_struct_lock.release(); - - return m_next_epoch.load().epoch; } @@ -494,23 +479,6 @@ private: } while(true); delete epoch; - - /* - * Following the epoch's destruction, any buffers - * or structures with no remaining references can - * be safely freed. - */ - std::unique_lock lock(m_struct_lock); - - for (auto itr = m_versions.begin(); itr != m_versions.end();) { - if ((*itr)->get_reference_count() == 0) { - auto tmp = *itr; - itr = m_versions.erase(itr); - delete tmp; - } else { - itr++; - } - } } static void reconstruction(void *arguments) { -- cgit v1.2.3