From cbb19322557671a23b6643ce9079594c7eec716b Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 21 Jan 2025 16:07:02 -0500 Subject: Finished rename of Epoch to Version + imported changes to psudb-common --- external/psudb-common | 2 +- include/framework/DynamicExtension.h | 49 +++++++- .../reconstruction/ReconstructionPolicy.h | 2 +- include/framework/scheduling/Task.h | 2 +- include/framework/scheduling/Version.h | 130 +++++++++++++++++++++ 5 files changed, 177 insertions(+), 8 deletions(-) create mode 100644 include/framework/scheduling/Version.h diff --git a/external/psudb-common b/external/psudb-common index 3be9caf..ce3b373 160000 --- a/external/psudb-common +++ b/external/psudb-common @@ -1 +1 @@ -Subproject commit 3be9caf90a12b6ac3afd4437ddd62167ba6d28b0 +Subproject commit ce3b373b75c28098df83ec95234a90cb4f4d364f diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index ea3ef4d..62aaf88 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -24,7 +24,7 @@ #include "framework/util/Configuration.h" -#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/Version.h" #include "util/types.h" namespace de { @@ -447,8 +447,6 @@ private: args->result.set_value(true); } - ((DynamicExtension *)args->extension) - ->m_scheduling_reconstruction.store(false); delete args; } @@ -573,7 +571,21 @@ private: return get_active_version()->get_structure()->copy(); } + + void begin_reconstruction_scheduling() { + bool cur_val; + do { + cur_val = m_scheduling_reconstruction.load(); + } while(cur_val == true && !m_scheduling_reconstruction.compare_exchange_strong(cur_val, true)); + } + + void end_reconstruction_scheduling() { + /* no need for any other sync here, this thread has an implicit lock */ + m_scheduling_reconstruction.store(false); + } + void schedule_flush() { + begin_reconstruction_scheduling(); auto new_version = create_version(); auto *args = new ReconstructionArgs(); @@ -589,12 +601,39 @@ private: m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION); if (m_config.recon_enable_maint_on_flush) { - schedule_maint_reconstruction(); + schedule_maint_reconstruction(false); } + + end_reconstruction_scheduling(); } - void schedule_maint_reconstruction() { + void schedule_maint_reconstruction(bool take_reconstruction_lock=true) { + + if (take_reconstruction_lock) { + begin_reconstruction_scheduling(); + } + + // FIXME: memory management issue here? + auto new_version = create_version(m_active_version.load()->get_structure()); + + auto *args = new ReconstructionArgs(); + args->version.load(new_version); + args->tasks = m_recon_policy->get_reconstruction_tasks(args->version.get()); + args->extension = this; + args->priority = ReconstructionPriority::MAINT; + + /* + * NOTE: args is deleted by the reconstruction job, so shouldn't be + * freed here + */ + m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION); + + + if (take_reconstruction_lock) { + end_reconstruction_scheduling(); + } + return; } diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h index 2c737de..48bddcf 100644 --- a/include/framework/reconstruction/ReconstructionPolicy.h +++ b/include/framework/reconstruction/ReconstructionPolicy.h @@ -14,7 +14,7 @@ #include "util/types.h" #include "framework/structure/ExtensionStructure.h" -#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/Version.h" namespace de { template QueryType> diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 1ab35d2..197e8bf 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -17,7 +17,7 @@ #include #include -#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/Version.h" #include "framework/scheduling/statistics.h" #include "util/types.h" diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h new file mode 100644 index 0000000..e961777 --- /dev/null +++ b/include/framework/scheduling/Version.h @@ -0,0 +1,130 @@ +/* + * include/framework/scheduling/Version.h + * + * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * + * Distributed under the Modified BSD License. + * + */ +#pragma once + +#include + +#include "framework/structure/BufferView.h" +#include "framework/structure/ExtensionStructure.h" +#include "framework/structure/MutableBuffer.h" + +namespace de { + +template QueryType> +class Version { +private: + typedef typename ShardType::RECORD RecordType; + typedef MutableBuffer Buffer; + typedef ExtensionStructure Structure; + typedef BufferView BufView; + +public: + Version(size_t number = 0) + : m_buffer(nullptr), m_structure(nullptr), m_active_merge(false), + m_epoch_number(number), m_buffer_head(0) {} + + Version(size_t number, Structure *structure, Buffer *buff, size_t head) + : m_buffer(buff), m_structure(structure), m_active_merge(false), + m_epoch_number(number), m_buffer_head(head) { + structure->take_reference(); + } + + ~Version() { + if (m_structure) { + m_structure->release_reference(); + } + + if (m_structure->get_reference_count() == 0) { + delete m_structure; + } + } + + /* + * Versions are *not* copyable or movable. Only one can exist, and all users + * of it work with pointers + */ + Version(const Version &) = delete; + Version(Version &&) = delete; + Version &operator=(const Version &) = delete; + Version &operator=(Version &&) = delete; + + size_t get_epoch_number() const { return m_epoch_number; } + + const Structure *get_structure() const { return m_structure; } + + Structure *get_mutable_structure() { return m_structure; } + + BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); } + + /* + * Returns a new Version object that is a copy of this one. The new object + * will also contain a copy of the m_structure, rather than a reference to + * the same one. The epoch number of the new epoch will be set to the + * provided argument. + */ + Version *clone(size_t number) { + std::unique_lock m_buffer_lock; + auto epoch = new Version(number); + epoch->m_buffer = m_buffer; + epoch->m_buffer_head = m_buffer_head; + + if (m_structure) { + epoch->m_structure = m_structure->copy(); + /* the copy routine returns a structure with 0 references */ + epoch->m_structure->take_reference(); + } + + return epoch; + } + + /* + * Check if a merge can be started from this Version. At present, without + * concurrent merging, this simply checks if there is currently a scheduled + * merge based on this Version. If there is, returns false. If there isn't, + * return true and set a flag indicating that there is an active merge. + */ + bool prepare_reconstruction() { + auto old = m_active_merge.load(); + if (old) { + return false; + } + + // FIXME: this needs cleaned up + while (!m_active_merge.compare_exchange_strong(old, true)) { + old = m_active_merge.load(); + if (old) { + return false; + } + } + + return true; + } + + bool advance_buffer_head(size_t head) { + m_buffer_head = head; + return m_buffer->advance_head(m_buffer_head); + } + +private: + Buffer *m_buffer; + Structure *m_structure; + + std::mutex m_buffer_lock; + std::atomic m_active_merge; + + /* + * The number of currently active jobs + * (queries/merges) operating on this + * epoch. An epoch can only be retired + * when this number is 0. + */ + size_t m_epoch_number; + size_t m_buffer_head; +}; +} // namespace de -- cgit v1.2.3