diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-01-31 16:16:28 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-01-31 16:16:28 -0500 |
| commit | 3061bfbf1c4b5092fa4234de3105a615fcef18ea (patch) | |
| tree | 4c41cdf3b8e3af6e38ba1c8f283d2e35086cc2af /include/framework/DynamicExtension.h | |
| parent | 30da48151f58803968ca3ef5d42e66a9223d80a4 (diff) | |
| download | dynamic-extension-3061bfbf1c4b5092fa4234de3105a615fcef18ea.tar.gz | |
More updates
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 80 |
1 files changed, 63 insertions, 17 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index da2945a..91bed98 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -87,6 +87,8 @@ public: m_config.maximum_threads); m_active_version.store( std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0)); + + m_version_counter = INITIAL_VERSION; } /** @@ -226,7 +228,7 @@ public: * @return The number of levels within the index */ size_t get_height() { - return get_active_version()->get_structure()->get_height(); + return get_active_version()->get_structure()->get_height() - 1; } /** @@ -381,8 +383,6 @@ private: size_t m_core_cnt; std::atomic<int> m_next_core; - ReconPolicyType const *m_recon_policy; - /* versioning + concurrency variables */ std::atomic<size_t> m_version_counter; std::atomic<std::shared_ptr<VersionType>> m_active_version; @@ -390,6 +390,8 @@ private: std::condition_variable m_version_advance_cv; std::mutex m_version_advance_mtx; + std::atomic<bool> m_flush_in_progress = false; + alignas(64) std::atomic<bool> m_scheduling_reconstruction; void enforce_delete_invariant(VersionType *version) { @@ -420,6 +422,8 @@ private: } } + size_t m_flush_cnt = 0; + static void reconstruction(void *arguments) { auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments; auto extension = (DynamicExtension *)args->extension; @@ -437,11 +441,14 @@ private: * this code will be bypassed in that case. */ if (args->priority == ReconstructionPriority::FLUSH) { + assert(extension->m_flush_in_progress.load()); /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); - args->version->set_next_buffer_head(buffview.get_tail()); + assert(buffview.get_tail() != buffview.get_head()); + auto new_shard = std::make_shared<ShardType>(std::move(buffview)); + /* * Flushes already know their version id. To avoid needing to * do any update reconciliation between structures, they wait @@ -449,14 +456,22 @@ private: * and only then take a copy of the structure. */ extension->await_version(args->version->get_id() - 1); - StructureType *structure = - extension->get_active_version()->get_structure()->copy(); - /* add the newly created shard to the structure copy */ - structure->append_l0(new_shard); + if (extension->m_config.recon_maint_disabled) { + assert(args->version->get_mutable_structure()); + args->version->get_mutable_structure()->append_l0(std::move(new_shard)); + } else { + assert(!args->version->get_mutable_structure()); + auto structure = std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy()); - /* set this version's structure to the newly created one */ - args->version->set_structure(std::unique_ptr<StructureType>(structure)); + /* add the newly created shard to the structure copy */ + structure->append_l0(std::move(new_shard)); + + /* set this version's structure to the newly created one */ + args->version->set_structure(std::move(structure)); + } + + args->version->advance_buffer_head(); } /* perform all of the reconstructions */ @@ -476,6 +491,10 @@ private: /* advance the index to the newly finished version */ extension->install_new_version(args->version); + if (args->priority == ReconstructionPriority::FLUSH) { + extension->m_flush_in_progress.store(false); + } + /* manually delete the argument object */ delete args; } @@ -549,11 +568,11 @@ private: * at the time it is activated, but the version number must be claimed * early to minimize activation blocking. */ - version_ptr create_version() { + version_ptr create_version_flush(std::unique_ptr<StructureType> structure) { size_t version_id = m_version_counter.fetch_add(1); auto active_version = get_active_version(); std::shared_ptr<VersionType> new_version = - std::make_shared<VersionType>(version_id, nullptr, m_buffer.get(), active_version->get_buffer().get_head()); + std::make_shared<VersionType>(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head()); return new_version; } @@ -565,7 +584,7 @@ private: * no version number is claimed until the version is activated, to * prevent blocking buffer flushes. */ - version_ptr create_version(std::unique_ptr<StructureType> structure) { + version_ptr create_version_maint(std::unique_ptr<StructureType> structure) { auto active_version = get_active_version(); version_ptr new_version = std::make_shared<VersionType>(INVALID_VERSION, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head()); @@ -622,11 +641,34 @@ private: void schedule_flush() { begin_reconstruction_scheduling(); - auto new_version = create_version(); + + bool old = m_flush_in_progress.load(); + if (old || !m_flush_in_progress.compare_exchange_strong(old, true)) { + end_reconstruction_scheduling(); + return; + } + + /* + * for "legacy" policies, without background reconstruction, we need + * a valid structure object as part of the version prior to determining + * the flush operations. Otherwise, the flush operation should only ever + * do the flush itself, and so no structure is needed at this point + * + * delaying obtaining the structure until later, when maintenance + * reconstructions are enabled, removes the need for flushes to reconcile + * differences in structure between when they are scheduled and when they + * finish + */ + std::unique_ptr<StructureType> structure = nullptr; + if (m_config.recon_maint_disabled) { + structure = std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy()); + } + + auto new_version = create_version_flush(std::move(structure)); auto *args = new ReconstructionArgs<ShardType, QueryType>(); args->version = new_version; - args->tasks = m_recon_policy->get_flush_tasks(args->version.get()); + args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::FLUSH; @@ -646,16 +688,20 @@ private: void schedule_maint_reconstruction(bool take_reconstruction_lock = true) { + if (m_config.recon_maint_disabled) { + return; + } + if (take_reconstruction_lock) { begin_reconstruction_scheduling(); } // FIXME: memory management issue here? - auto new_version = create_version(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy())); + auto new_version = create_version_maint(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy())); auto *args = new ReconstructionArgs<ShardType, QueryType>(); args->version = new_version; - args->tasks = m_recon_policy->get_reconstruction_tasks(args->version.get()); + args->tasks = m_config.recon_policy->get_reconstruction_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::MAINT; |