From 6c906c94e1eea6d4356b8c99b93da39029e8d95d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 17 Jan 2025 17:28:50 -0500 Subject: Progress --- include/framework/DynamicExtension.h | 446 +++++++++++++---------------------- 1 file changed, 160 insertions(+), 286 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 5a64243..ea3ef4d 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -11,15 +11,19 @@ #include #include +#include #include #include "framework/interface/Scheduler.h" #include "framework/reconstruction/ReconstructionPolicy.h" #include "framework/scheduling/SerialScheduler.h" +#include "framework/scheduling/Task.h" #include "framework/structure/ExtensionStructure.h" #include "framework/structure/MutableBuffer.h" +#include "framework/util/Configuration.h" + #include "framework/scheduling/Epoch.h" #include "util/types.h" @@ -34,9 +38,10 @@ private: typedef typename ShardType::RECORD RecordType; typedef MutableBuffer Buffer; typedef ExtensionStructure Structure; - typedef Epoch _Epoch; + typedef Version Version; typedef BufferView BufView; typedef ReconstructionPolicy ReconPolicyType; + typedef DEConfiguration ConfType; typedef typename QueryType::Parameters Parameters; typedef typename QueryType::LocalQuery LocalQuery; @@ -47,11 +52,6 @@ private: static constexpr size_t QUERY = 1; static constexpr size_t RECONSTRUCTION = 2; - struct epoch_ptr { - _Epoch *epoch; - size_t refcnt; - }; - public: /** * Create a new Dynamized version of a data structure, supporting @@ -74,42 +74,26 @@ public: * framework's scheduler for use in answering queries and * performing compactions and flushes, etc. */ - DynamicExtension(ReconPolicyType *recon_policy, size_t buffer_low_watermark, - size_t buffer_high_watermark = 0, size_t memory_budget = 0, - size_t thread_cnt = 16) - : m_max_delete_prop(1), m_sched(memory_budget, thread_cnt), - m_buffer(new Buffer(buffer_low_watermark, (buffer_high_watermark == 0) - ? buffer_low_watermark - : buffer_high_watermark)), - m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0), - m_recon_policy(recon_policy) { - - auto vers = new Structure(); - m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); - m_previous_epoch.store({nullptr, 0}); - m_next_epoch.store({nullptr, 0}); + DynamicExtension(ConfType &&config) : m_config(std::move(config)) { + m_buffer = std::make_unique(m_config.buffer_flush_trigger, + m_config.buffer_size); + + m_sched = SchedType(m_config.maximum_memory_usage, m_config.maximum_threads); + m_active_version.load(std::make_shared(INITIAL_VERSION, new Structure(), m_buffer.get(), 0)); } /** * Destructor for DynamicExtension. Will block until the completion of - * any outstanding epoch transition, shut down the scheduler, and free + * any outstanding version transition, shut down the scheduler, and free * all currently allocated shards, buffers, etc., by calling their * destructors. */ ~DynamicExtension() { - - /* let any in-flight epoch transition finish */ - await_next_epoch(); + /* let any in-flight version transitions finish */ + await_newest_version(); /* shutdown the scheduler */ m_sched.shutdown(); - - /* delete all held resources */ - delete m_next_epoch.load().epoch; - delete m_current_epoch.load().epoch; - delete m_previous_epoch.load().epoch; - - delete m_buffer; } /** @@ -164,15 +148,11 @@ public: "Tagging is only supported in single-threaded operation"); auto view = m_buffer->get_buffer_view(); - - auto epoch = get_active_epoch(); - if (epoch->get_mutable_structure()->tagged_delete(rec)) { - end_job(epoch); + auto version = get_active_version(); + if (version->get_mutable_structure()->tagged_delete(rec)) { return 1; } - end_job(epoch); - /* * the buffer will take the longest amount of time, and * probably has the lowest probability of having the record, @@ -208,10 +188,9 @@ public: * @return The number of records within the index */ size_t get_record_count() { - auto epoch = get_active_epoch(); - auto t = epoch->get_buffer().get_record_count() + - epoch->get_structure()->get_record_count(); - end_job(epoch); + auto version = get_active_version(); + auto t = version->get_buffer().get_record_count() + + version->get_structure()->get_record_count(); return t; } @@ -224,10 +203,9 @@ public: * @return The number of tombstone records within the index */ size_t get_tombstone_count() { - auto epoch = get_active_epoch(); - auto t = epoch->get_buffer().get_tombstone_count() + - epoch->get_structure()->get_tombstone_count(); - end_job(epoch); + auto version = get_active_version(); + auto t = version->get_buffer().get_tombstone_count() + + version->get_structure()->get_tombstone_count(); return t; } @@ -241,11 +219,7 @@ public: * @return The number of levels within the index */ size_t get_height() { - auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->get_height(); - end_job(epoch); - - return t; + return get_active_version()->get_structure()->get_height(); } /** @@ -254,11 +228,7 @@ public: * @return The number of non-empty shards within the index */ size_t get_shard_count() { - auto epoch = get_active_epoch(); - auto s = epoch->get_structure()->get_shard_count(); - end_job(epoch); - - return s; + return get_active_version()->get_structure()->get_shard_count(); } /** @@ -272,10 +242,9 @@ public: * ShardType::get_memory_usage) and the buffer by the index. */ size_t get_memory_usage() { - auto epoch = get_active_epoch(); + auto version = get_active_version(); auto t = m_buffer->get_memory_usage() + - epoch->get_structure()->get_memory_usage(); - end_job(epoch); + version->get_structure()->get_memory_usage(); return t; } @@ -289,11 +258,7 @@ public: * (as reported by ShardType::get_aux_memory_usage) by the index. */ size_t get_aux_memory_usage() { - auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->get_aux_memory_usage(); - end_job(epoch); - - return t; + return get_active_version()->get_structure()->get_aux_memory_usage(); } /** @@ -312,19 +277,19 @@ public: ShardType * create_static_structure(bool await_reconstruction_completion = false) { if (await_reconstruction_completion) { - await_next_epoch(); + await_newest_version(); } - auto epoch = get_active_epoch(); - auto vers = epoch->get_structure(); + auto version = get_active_version(); + auto structure = version->get_structure(); std::vector shards; - if (vers->get_level_vector().size() > 0) { - for (int i = vers->get_level_vector().size() - 1; i >= 0; i--) { - if (vers->get_level_vector()[i] && - vers->get_level_vector()[i]->get_record_count() > 0) { + if (structure->get_level_vector().size() > 0) { + for (int i = structure->get_level_vector().size() - 1; i >= 0; i--) { + if (structure->get_level_vector()[i] && + structure->get_level_vector()[i]->get_record_count() > 0) { shards.emplace_back( - vers->get_level_vector()[i]->get_combined_shard()); + structure->get_level_vector()[i]->get_combined_shard()); } } } @@ -336,7 +301,7 @@ public: * head pointer any longer */ { - auto bv = epoch->get_buffer(); + auto bv = version->get_buffer(); if (bv.get_record_count() > 0) { shards.emplace_back(new ShardType(std::move(bv))); } @@ -348,19 +313,28 @@ public: delete shard; } - end_job(epoch); return flattened; } /* - * If the current epoch is *not* the newest one, then wait for - * the newest one to become available. Otherwise, returns immediately. + * Determines the newest pending version at the time of call, and + * blocks until that version becomes active. */ - void await_next_epoch() { - while (m_next_epoch.load().epoch != nullptr) { - std::unique_lock lk(m_epoch_cv_lk); - m_epoch_cv.wait(lk); + void await_newest_version() { + /* + * versions are assigned by fetch and add on the counter, so the + * newest assigned version number will be one less than the value + * of the counter + */ + auto newest_pending_version = m_version_counter.load() - 1; + + /* versions signal on m_version_advance_cv when they activate */ + while (m_active_version.load() < newest_pending_version) { + std::unique_lock lk(m_version_advance_mtx); + m_version_advance_cv.wait(lk); } + + return; } /** @@ -372,11 +346,8 @@ public: * satisfied, and false if it is not. */ bool validate_tombstone_proportion() { - auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->validate_tombstone_proportion( - m_max_delete_prop); - end_job(epoch); - return t; + return get_active_version()->get_structure()->validate_tombstone_proportion( + m_config.maximum_delete_proportion); } /** @@ -386,28 +357,34 @@ public: void print_scheduler_statistics() const { m_sched.print_statistics(); } private: - double m_max_delete_prop; + ConfType m_config; + SchedType m_sched; - Buffer *m_buffer; + std::unique_ptr m_buffer; size_t m_core_cnt; std::atomic m_next_core; - std::atomic m_epoch_cnt; ReconPolicyType const *m_recon_policy; - alignas(64) std::atomic m_reconstruction_scheduled; + /* versioning + concurrency variables */ + std::atomic m_version_counter; + typedef std::atomic> version_ptr; + + version_ptr m_active_version; - std::atomic m_next_epoch; - std::atomic m_current_epoch; - std::atomic m_previous_epoch; + typedef size_t version_id; + const size_t INVALID_VERSION = 0; + const size_t INITIAL_VERSION = 1; - std::condition_variable m_epoch_cv; - std::mutex m_epoch_cv_lk; + std::condition_variable m_version_advance_cv; + std::mutex m_version_advance_mtx; - void enforce_delete_invariant(_Epoch *epoch) { - auto structure = epoch->get_structure(); + alignas(64) std::atomic m_scheduling_reconstruction; + + void enforce_delete_invariant(Version *version) { + auto structure = version->get_structure(); auto compactions = structure->get_compaction_tasks(); while (compactions.size() > 0) { @@ -415,7 +392,7 @@ private: /* schedule a compaction */ ReconstructionArgs *args = new ReconstructionArgs(); - args->epoch = epoch; + args->version = version; args->merges = compactions; args->extension = this; args->compaction = true; @@ -434,137 +411,11 @@ private: } } - _Epoch *get_active_epoch() { - epoch_ptr old, new_ptr; - - do { - /* - * during an epoch transition, a nullptr will installed in the - * current_epoch. At this moment, the "new" current epoch will - * soon be installed, but the "current" current epoch has been - * moved back to m_previous_epoch. - */ - if (m_current_epoch.load().epoch == nullptr) { - old = m_previous_epoch; - new_ptr = {old.epoch, old.refcnt + 1}; - if (old.epoch != nullptr && - m_previous_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } else { - old = m_current_epoch; - new_ptr = {old.epoch, old.refcnt + 1}; - if (old.epoch != nullptr && - m_current_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } - } while (true); - - assert(new_ptr.refcnt > 0); - - return new_ptr.epoch; - } - - void advance_epoch(size_t buffer_head) { - - retire_epoch(m_previous_epoch.load().epoch); - - epoch_ptr tmp = {nullptr, 0}; - epoch_ptr cur; - do { - cur = m_current_epoch; - } while (!m_current_epoch.compare_exchange_strong(cur, tmp)); - - m_previous_epoch.store(cur); - - // FIXME: this may currently block because there isn't any - // query preemption yet. At this point, we'd need to either - // 1) wait for all queries on the old_head to finish - // 2) kill all queries on the old_head - // 3) somehow migrate all queries on the old_head to the new - // version - while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) { - _mm_pause(); - } - - m_current_epoch.store(m_next_epoch); - m_next_epoch.store({nullptr, 0}); - - /* notify any blocking threads that the new epoch is available */ - m_epoch_cv_lk.lock(); - m_epoch_cv.notify_all(); - m_epoch_cv_lk.unlock(); - } - - /* - * Creates a new epoch by copying the currently active one. The new epoch's - * structure will be a shallow copy of the old one's. - */ - _Epoch *create_new_epoch() { - /* - * This epoch access is _not_ protected under the assumption that - * only one reconstruction will be able to trigger at a time. If that - * condition is violated, it is possible that this code will clone a retired - * epoch. - */ - assert(m_next_epoch.load().epoch == nullptr); - auto current_epoch = get_active_epoch(); - - m_epoch_cnt.fetch_add(1); - m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0}); - - end_job(current_epoch); - - return m_next_epoch.load().epoch; - } - - void retire_epoch(_Epoch *epoch) { - /* - * Epochs with currently active jobs cannot - * be retired. By the time retire_epoch is called, - * it is assumed that a new epoch is active, meaning - * that the epoch to be retired should no longer - * accumulate new active jobs. Eventually, this - * number will hit zero and the function will - * proceed. - */ - - if (epoch == nullptr) { - return; - } - - epoch_ptr old, new_ptr; - new_ptr = {nullptr, 0}; - do { - old = m_previous_epoch.load(); - - /* - * If running in single threaded mode, the failure to retire - * an Epoch will result in the thread of execution blocking - * indefinitely. - */ - if constexpr (std::same_as) { - if (old.epoch == epoch) - assert(old.refcnt == 0); - } - - if (old.epoch == epoch && old.refcnt == 0 && - m_previous_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - usleep(1); - - } while (true); - - delete epoch; - } - static void reconstruction(void *arguments) { auto args = (ReconstructionArgs *)arguments; ((DynamicExtension *)args->extension)->SetThreadAffinity(); - Structure *vers = args->epoch->get_mutable_structure(); + Structure *vers = args->version->get_mutable_structure(); ReconstructionTask flush_task; flush_task.type = ReconstructionType::Invalid; @@ -586,18 +437,18 @@ private: * was done so as to make room for the full buffer anyway, so there's * no real benefit to doing this first. */ - auto buffer_view = args->epoch->get_buffer(); + auto buffer_view = args->version->get_buffer(); size_t new_head = buffer_view.get_tail(); vers->perform_flush(flush_task, std::move(buffer_view)); args->result.set_value(true); - ((DynamicExtension *)args->extension)->advance_epoch(new_head); + ((DynamicExtension *)args->extension)->advance_version(new_head); } else { args->result.set_value(true); } ((DynamicExtension *)args->extension) - ->m_reconstruction_scheduled.store(false); + ->m_scheduling_reconstruction.store(false); delete args; } @@ -605,10 +456,10 @@ private: static void async_query(void *arguments) { auto *args = (QueryArgs *)arguments; - auto epoch = args->extension->get_active_epoch(); + auto version = args->extension->get_active_version(); - auto buffer = epoch->get_buffer(); - auto vers = epoch->get_structure(); + auto buffer = version->get_buffer(); + auto vers = version->get_structure(); auto *parms = &(args->query_parms); /* create initial buffer query */ @@ -653,9 +504,6 @@ private: /* return the output vector to caller via the future */ args->result_set.set_value(std::move(output)); - /* officially end the query job, releasing the pin on the epoch */ - args->extension->end_job(epoch); - /* clean up memory allocated for temporary query objects */ delete buffer_query; for (size_t i = 0; i < local_queries.size(); i++) { @@ -665,22 +513,89 @@ private: delete args; } - void schedule_reconstruction() { - auto epoch = create_new_epoch(); + version_ptr get_active_version() { return m_active_version.load(); } + + /* + * Create a new version with an assigned version number, but without + * an assigned copy of the structure. Intended for use in flushing, + * where the structure will be copied from the currently active version + * at the time it is activated, but the version number must be claimed + * early to minimize activation blocking. + */ + version_ptr create_version() { + size_t version_id = m_version_counter.fetch_add(1); + auto active_version = get_active_version(); + version_ptr new_version = std::make_shared(version_id, nullptr, &m_buffer, 0); + + return new_version; + } + + /* + * Create a new version without an assigned version number, but with + * a copy of the extension structure. This is for use with background + * reconstructions, where the underlying structure is manipulated, but + * no version number is claimed until the version is activated, to + * prevent blocking buffer flushes. + */ + version_ptr create_version(Structure *structure) { + version_ptr new_version = std::make_shared(INVALID_VERSION, structure, &m_buffer, 0); + + return new_version; + } + + void install_new_version(version_ptr new_version) { + assert(new_version->get_structure()); + assert(new_version->get_version_number() != INVALID_VERSION); + + auto old = get_active_version(); + assert(new_version->get_version_number() > old->get_version_number()); + + /* wait until our turn to install the new version */ + auto lk = std::unique_lock(m_version_advance_mtx); + while (new_version.load()->get_verison_number() != old.load()->get_version_number() + 1) { + m_version_advance_cv.wait(lk); + old = get_active_version(); + } + + // FIXME: implement this interface + // new_version->merge_changes_from(old.load().get()); + + /* + * Only one version can have a given number, so we are safe to + * directly assign here--nobody else is going to change it out from + * under us. We're also protected under the mutex. + */ + m_active_version.store(new_version); + m_version_advance_cv.notify_all(); + } + + Structure *create_scratch_structure() { + return get_active_version()->get_structure()->copy(); + } + + void schedule_flush() { + auto new_version = create_version(); - ReconstructionArgs *args = - new ReconstructionArgs(); - args->epoch = epoch; - args->tasks = m_recon_policy->get_reconstruction_tasks( - epoch, m_buffer->get_high_watermark()); + auto *args = new ReconstructionArgs(); + args->version.load(new_version); + args->tasks = m_recon_policy->get_flush_tasks(args->version.get()); args->extension = this; - args->tasks.add_reconstruction(m_recon_policy->get_flush_task(epoch)); + args->priority = ReconstructionPriority::FLUSH; /* - * NOTE: args is deleted by the reconstruction job, so shouldn't be freed - * here + * NOTE: args is deleted by the reconstruction job, so shouldn't be + * freed here */ - m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); + m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION); + + if (m_config.recon_enable_maint_on_flush) { + schedule_maint_reconstruction(); + } + } + + + void schedule_maint_reconstruction() { + return; } std::future schedule_query(Parameters &&query_parms) { @@ -697,9 +612,8 @@ private: int internal_append(const RecordType &rec, bool ts) { if (m_buffer->is_at_low_watermark()) { auto old = false; - - if (m_reconstruction_scheduled.compare_exchange_strong(old, true)) { - schedule_reconstruction(); + if (m_scheduling_reconstruction.compare_exchange_strong(old, true)) { + schedule_flush(); } } @@ -738,45 +652,5 @@ private: #else void SetThreadAffinity() {} #endif - - void end_job(_Epoch *epoch) { - epoch_ptr old, new_ptr; - - do { - if (m_previous_epoch.load().epoch == epoch) { - old = m_previous_epoch; - /* - * This could happen if we get into the system during a - * transition. In this case, we can just back out and retry - */ - if (old.epoch == nullptr) { - continue; - } - - assert(old.refcnt > 0); - - new_ptr = {old.epoch, old.refcnt - 1}; - if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } else { - old = m_current_epoch; - /* - * This could happen if we get into the system during a - * transition. In this case, we can just back out and retry - */ - if (old.epoch == nullptr) { - continue; - } - - assert(old.refcnt > 0); - - new_ptr = {old.epoch, old.refcnt - 1}; - if (m_current_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } - } while (true); - } }; } // namespace de -- cgit v1.2.3