diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-01-27 18:17:21 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-01-27 18:17:21 -0500 |
| commit | 30da48151f58803968ca3ef5d42e66a9223d80a4 (patch) | |
| tree | 23cad3718bca116016caf5aba375a3eb3a490328 /include/framework/DynamicExtension.h | |
| parent | f149a2459cfc2007f755d792b3c4e567d30c132f (diff) | |
| download | dynamic-extension-30da48151f58803968ca3ef5d42e66a9223d80a4.tar.gz | |
progress
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 229 |
1 files changed, 123 insertions, 106 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 0331353..da2945a 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -36,10 +36,10 @@ class DynamicExtension { private: /* convenience typedefs for commonly used types within the class */ typedef typename ShardType::RECORD RecordType; - typedef MutableBuffer<RecordType> Buffer; - typedef ExtensionStructure<ShardType, QueryType> Structure; - typedef Version<ShardType, QueryType> Version; - typedef BufferView<RecordType> BufView; + typedef MutableBuffer<RecordType> BufferType; + typedef ExtensionStructure<ShardType, QueryType> StructureType; + typedef Version<ShardType, QueryType> VersionType; + typedef BufferView<RecordType> BufferViewType; typedef ReconstructionPolicy<ShardType, QueryType> ReconPolicyType; typedef DEConfiguration<ShardType, QueryType, D, SchedType> ConfType; @@ -52,6 +52,11 @@ private: static constexpr size_t QUERY = 1; static constexpr size_t RECONSTRUCTION = 2; + typedef std::shared_ptr<VersionType> version_ptr; + typedef size_t version_id; + static constexpr size_t INVALID_VERSION = 0; + static constexpr size_t INITIAL_VERSION = 1; + public: /** * Create a new Dynamized version of a data structure, supporting @@ -74,12 +79,14 @@ public: * framework's scheduler for use in answering queries and * performing compactions and flushes, etc. */ - DynamicExtension(ConfType &&config) : m_config(std::move(config)) { - m_buffer = std::make_unique(m_config.buffer_flush_trigger, - m_config.buffer_size); - - m_sched = SchedType(m_config.maximum_memory_usage, m_config.maximum_threads); - m_active_version.load(std::make_shared(INITIAL_VERSION, new Structure(), m_buffer.get(), 0)); + DynamicExtension(ConfType &&config) : m_config(std::move(config)) { + m_buffer = + std::make_unique<BufferType>(m_config.buffer_flush_trigger, m_config.buffer_size); + + m_sched = std::make_unique<SchedType>(m_config.maximum_memory_usage, + m_config.maximum_threads); + m_active_version.store( + std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0)); } /** @@ -90,10 +97,10 @@ public: */ ~DynamicExtension() { /* let any in-flight version transitions finish */ - await_newest_version(); + await_version(); /* shutdown the scheduler */ - m_sched.shutdown(); + m_sched->shutdown(); } /** @@ -274,10 +281,11 @@ public: * the index. Ownership of this object is transfered to the * caller. */ + // FIXME: switch this over to std::unique_ptr ShardType * create_static_structure(bool await_reconstruction_completion = false) { if (await_reconstruction_completion) { - await_newest_version(); + await_version(); } auto version = get_active_version(); @@ -317,19 +325,27 @@ public: } /* - * Determines the newest pending version at the time of call, and - * blocks until that version becomes active. + * Blocks until the specified version id becomes active. If no version + * id is provided, wait for the newest pending version to be installed. + * + * NOTE: this method will return once the specified version has been + * installed, but does not guarantee that the specified version + * is the currently active one when it returns. It is possible + * that the active version upon the return of this method is newer + * than the one requested. */ - void await_newest_version() { - /* + void await_version(version_id vid = INVALID_VERSION) { + /* * versions are assigned by fetch and add on the counter, so the * newest assigned version number will be one less than the value * of the counter */ - auto newest_pending_version = m_version_counter.load() - 1; + if (vid == INVALID_VERSION) { + vid = m_version_counter.load() - 1; + } /* versions signal on m_version_advance_cv when they activate */ - while (m_active_version.load() < newest_pending_version) { + while (m_active_version.load()->get_id() < vid) { std::unique_lock lk(m_version_advance_mtx); m_version_advance_cv.wait(lk); } @@ -354,14 +370,13 @@ public: * Calls SchedType::print_statistics, which should write a report of * scheduler performance statistics to stdout. */ - void print_scheduler_statistics() const { m_sched.print_statistics(); } + void print_scheduler_statistics() const { m_sched->print_statistics(); } private: ConfType m_config; - - SchedType m_sched; - std::unique_ptr<Buffer> m_buffer; + std::unique_ptr<SchedType> m_sched; + std::unique_ptr<BufferType> m_buffer; size_t m_core_cnt; std::atomic<int> m_next_core; @@ -370,20 +385,14 @@ private: /* versioning + concurrency variables */ std::atomic<size_t> m_version_counter; - typedef std::atomic<std::shared_ptr<Version>> version_ptr; - - version_ptr m_active_version; - - typedef size_t version_id; - const size_t INVALID_VERSION = 0; - const size_t INITIAL_VERSION = 1; + std::atomic<std::shared_ptr<VersionType>> m_active_version; std::condition_variable m_version_advance_cv; std::mutex m_version_advance_mtx; alignas(64) std::atomic<bool> m_scheduling_reconstruction; - void enforce_delete_invariant(Version *version) { + void enforce_delete_invariant(VersionType *version) { auto structure = version->get_structure(); auto compactions = structure->get_compaction_tasks(); @@ -401,7 +410,7 @@ private: auto wait = args->result.get_future(); - m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); + m_sched->schedule_job(reconstruction, 0, args, RECONSTRUCTION); /* wait for compaction completion */ wait.get(); @@ -413,60 +422,61 @@ private: static void reconstruction(void *arguments) { auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments; - auto extension = (DynamicExtension *) args->extension; + auto extension = (DynamicExtension *)args->extension; extension->SetThreadAffinity(); + /* + * For "normal" flushes, the task vector should be empty, so this is + * all that will happen. Piggybacking internal reconstructions off + * the flush WILL bottleneck the system, but is left in place to + * allow the traditional policies (LEVELING, TIERING, etc.) to be + * emulated within the new system. + * + * Background reconstructions will not have a priority level of + * FLUSH, and will already have a structure present. As a result, + * this code will be bypassed in that case. + */ if (args->priority == ReconstructionPriority::FLUSH) { /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); - auto new_head = buffview.get_tail(); - auto new_shard = Shard(std::move(buffview)); + args->version->set_next_buffer_head(buffview.get_tail()); + auto new_shard = std::make_shared<ShardType>(std::move(buffview)); - /* copy the currently active version's structure */ - auto structure = extension->get_active_version()->get_structure()->clone(); - - - } + /* + * Flushes already know their version id. To avoid needing to + * do any update reconciliation between structures, they wait + * until the version directly preceeding them has been installed, + * and only then take a copy of the structure. + */ + extension->await_version(args->version->get_id() - 1); + StructureType *structure = + extension->get_active_version()->get_structure()->copy(); + /* add the newly created shard to the structure copy */ + structure->append_l0(new_shard); - else { - + /* set this version's structure to the newly created one */ + args->version->set_structure(std::unique_ptr<StructureType>(structure)); } - - Structure *vers = args->version->get_mutable_structure(); - - ReconstructionTask flush_task; - flush_task.type = ReconstructionType::Invalid; - + /* perform all of the reconstructions */ + StructureType *structure = args->version->get_mutable_structure(); for (size_t i = 0; i < args->tasks.size(); i++) { - if (args->tasks[i].sources.size() > 0 && - args->tasks[i].sources[0] == buffer_shid) { - flush_task = args->tasks[i]; - continue; - } - - vers->perform_reconstruction(args->tasks[i]); + structure->perform_reconstruction(args->tasks[i]); } - if (flush_task.type != ReconstructionType::Invalid) { - /* - * we'll grab the buffer AFTER doing the internal reconstruction, so we - * can flush as many records as possible in one go. The reconstruction - * was done so as to make room for the full buffer anyway, so there's - * no real benefit to doing this first. - */ - auto buffer_view = args->version->get_buffer(); - size_t new_head = buffer_view.get_tail(); - - vers->perform_flush(flush_task, std::move(buffer_view)); - args->result.set_value(true); - ((DynamicExtension *)args->extension)->advance_version(new_head); - } else { - args->result.set_value(true); + /* + * if there isn't already a version id on the new version (i.e., the + * reconstruction isn't a flush), generate one. + */ + if (args->version->get_id() == INVALID_VERSION) { + args->version->set_id(extension->m_version_counter.fetch_add(1)); } + /* advance the index to the newly finished version */ + extension->install_new_version(args->version); + /* manually delete the argument object */ delete args; } @@ -532,7 +542,7 @@ private: version_ptr get_active_version() { return m_active_version.load(); } - /* + /* * Create a new version with an assigned version number, but without * an assigned copy of the structure. Intended for use in flushing, * where the structure will be copied from the currently active version @@ -542,7 +552,8 @@ private: version_ptr create_version() { size_t version_id = m_version_counter.fetch_add(1); auto active_version = get_active_version(); - version_ptr new_version = std::make_shared<Version>(version_id, nullptr, &m_buffer, 0); + std::shared_ptr<VersionType> new_version = + std::make_shared<VersionType>(version_id, nullptr, m_buffer.get(), active_version->get_buffer().get_head()); return new_version; } @@ -551,51 +562,57 @@ private: * Create a new version without an assigned version number, but with * a copy of the extension structure. This is for use with background * reconstructions, where the underlying structure is manipulated, but - * no version number is claimed until the version is activated, to + * no version number is claimed until the version is activated, to * prevent blocking buffer flushes. */ - version_ptr create_version(Structure *structure) { - version_ptr new_version = std::make_shared<Version>(INVALID_VERSION, structure, &m_buffer, 0); + version_ptr create_version(std::unique_ptr<StructureType> structure) { + auto active_version = get_active_version(); + version_ptr new_version = + std::make_shared<VersionType>(INVALID_VERSION, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head()); return new_version; } void install_new_version(version_ptr new_version) { assert(new_version->get_structure()); - assert(new_version->get_version_number() != INVALID_VERSION); - - auto old = get_active_version(); - assert(new_version->get_version_number() > old->get_version_number()); + assert(new_version->get_id() != INVALID_VERSION); /* wait until our turn to install the new version */ - auto lk = std::unique_lock(m_version_advance_mtx); - while (new_version.load()->get_verison_number() != old.load()->get_version_number() + 1) { - m_version_advance_cv.wait(lk); - old = get_active_version(); - } + await_version(new_version->get_id() - 1); + + auto old = get_active_version(); // FIXME: implement this interface // new_version->merge_changes_from(old.load().get()); - /* + /* * Only one version can have a given number, so we are safe to * directly assign here--nobody else is going to change it out from - * under us. We're also protected under the mutex. + * under us. + */ + m_active_version.store(new_version); + + /* + * My understanding is that you don't *really* need this mutex for + * safety in modern C++ when sending the signal. But I'll grab it + * anyway out of an abundance of caution. I doubt this will be a + * major bottleneck. */ - m_active_version.store(new_version); - m_version_advance_cv.notify_all(); + auto lk = std::unique_lock(m_version_advance_mtx); + m_version_advance_cv.notify_all(); } - Structure *create_scratch_structure() { + StructureType *create_scratch_structure() { return get_active_version()->get_structure()->copy(); } - void begin_reconstruction_scheduling() { bool cur_val; do { cur_val = m_scheduling_reconstruction.load(); - } while(cur_val == true && !m_scheduling_reconstruction.compare_exchange_strong(cur_val, true)); + } while ( + cur_val == true && + !m_scheduling_reconstruction.compare_exchange_strong(cur_val, true)); } void end_reconstruction_scheduling() { @@ -608,16 +625,17 @@ private: auto new_version = create_version(); auto *args = new ReconstructionArgs<ShardType, QueryType>(); - args->version.load(new_version); + args->version = new_version; args->tasks = m_recon_policy->get_flush_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::FLUSH; - /* - * NOTE: args is deleted by the reconstruction job, so shouldn't be - * freed here + /* + * NOTE: args is deleted by the reconstruction job, so shouldn't be + * freed here */ - m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION); + m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, + RECONSTRUCTION); if (m_config.recon_enable_maint_on_flush) { schedule_maint_reconstruction(false); @@ -626,28 +644,27 @@ private: end_reconstruction_scheduling(); } - - void schedule_maint_reconstruction(bool take_reconstruction_lock=true) { + void schedule_maint_reconstruction(bool take_reconstruction_lock = true) { if (take_reconstruction_lock) { begin_reconstruction_scheduling(); } // FIXME: memory management issue here? - auto new_version = create_version(m_active_version.load()->get_structure()); + auto new_version = create_version(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy())); auto *args = new ReconstructionArgs<ShardType, QueryType>(); - args->version.load(new_version); + args->version = new_version; args->tasks = m_recon_policy->get_reconstruction_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::MAINT; - /* - * NOTE: args is deleted by the reconstruction job, so shouldn't be - * freed here + /* + * NOTE: args is deleted by the reconstruction job, so shouldn't be + * freed here */ - m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION); - + m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, + RECONSTRUCTION); if (take_reconstruction_lock) { end_reconstruction_scheduling(); @@ -662,7 +679,7 @@ private: args->query_parms = std::move(query_parms); auto result = args->result_set.get_future(); - m_sched.schedule_job(async_query, 0, (void *)args, QUERY); + m_sched->schedule_job(async_query, 0, (void *)args, QUERY); return result; } |