From d28f2cfcd4249fc7d984762a326e3f2d6dcba7dc Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 13 Feb 2025 18:13:33 -0500 Subject: progress towards resolving asynch structure merges --- include/framework/DynamicExtension.h | 96 +++++++---------- include/framework/scheduling/LockManager.h | 14 +++ include/framework/structure/ExtensionStructure.h | 132 +++++++++-------------- include/framework/structure/InternalLevel.h | 6 +- 4 files changed, 110 insertions(+), 138 deletions(-) (limited to 'include/framework') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index c2a59ea..31eb138 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -351,8 +351,8 @@ public: } /* versions signal on m_version_advance_cv when they activate */ + std::unique_lock lk(m_version_advance_mtx); while (m_active_version.load()->get_id() < vid) { - std::unique_lock lk(m_version_advance_mtx); m_version_advance_cv.wait(lk); } @@ -402,8 +402,6 @@ private: std::condition_variable m_version_advance_cv; std::mutex m_version_advance_mtx; - std::atomic m_flush_in_progress = false; - LockManager m_lock_mngr; alignas(64) std::atomic m_scheduling_reconstruction; @@ -436,13 +434,15 @@ private: } } - size_t m_flush_cnt = 0; - static void reconstruction(void *arguments) { auto args = (ReconstructionArgs *)arguments; auto extension = (DynamicExtension *)args->extension; extension->SetThreadAffinity(); + + std::vector> reconstructions; + size_t new_head = 0; + /* * For "normal" flushes, the task vector should be empty, so this is * all that will happen. Piggybacking internal reconstructions off @@ -456,48 +456,26 @@ private: */ if (args->priority == ReconstructionPriority::FLUSH) { fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id()); - assert(extension->m_flush_in_progress.load()); + /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); assert(buffview.get_tail() != buffview.get_head()); - size_t new_head = buffview.get_tail(); - - auto new_shard = std::make_shared(std::move(buffview)); - - - /* - * Flushes already know their version id. To avoid needing to - * do any update reconciliation between structures, they wait - * until the version directly preceeding them has been installed, - * and only then take a copy of the structure. - */ - extension->await_version(args->version->get_id() - 1); + new_head = buffview.get_tail(); - if (extension->m_config.recon_maint_disabled) { - assert(args->version->get_mutable_structure()); - args->version->get_mutable_structure()->append_l0(std::move(new_shard), args->version->get_id()); - } else { - assert(!args->version->get_mutable_structure()); - auto structure = std::unique_ptr(extension->get_active_version()->get_structure()->copy()); + reconstruction_results flush_recon; + flush_recon.target_level = 0; + flush_recon.new_shard = std::make_shared(std::move(buffview)); - /* add the newly created shard to the structure copy */ - structure->append_l0(std::move(new_shard), args->version->get_id()); - - /* set this version's structure to the newly created one */ - args->version->set_structure(std::move(structure)); - } - - args->version->advance_buffer_head(new_head); + reconstructions.push_back(flush_recon); } else { fprintf(stderr, "[I] Running background reconstruction\n"); } /* perform all of the reconstructions */ - StructureType *structure = args->version->get_mutable_structure(); + auto structure = args->version->get_structure(); + for (size_t i = 0; i < args->tasks.size(); i++) { - if (structure->perform_reconstruction(args->tasks[i])) { - extension->m_lock_mngr.add_lock(); - } + reconstructions.emplace_back(structure->perform_reconstruction(args->tasks[i])); } /* @@ -508,6 +486,20 @@ private: args->version->set_id(extension->m_version_counter.fetch_add(1)); fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id()); } + + /* wait for our opportunity to install the updates */ + extension->await_version(args->version->get_id() - 1); + + /* get a fresh copy of the structure and apply our updates */ + args->version->set_structure(std::move(std::unique_ptr(extension->get_active_version()->get_structure()->copy()))); + + for (auto recon : reconstructions) { + auto grow = args->version->get_mutable_structure()->append_shard(recon.new_shard, args->version->get_id(), recon.target_level); + args->version->get_mutable_structure()->delete_shards(recon.source_shards); + if (grow) { + extension->m_lock_mngr.add_lock(); + } + } /* advance the index to the newly finished version */ extension->install_new_version(args->version, args->initial_version); @@ -526,10 +518,16 @@ private: extension->m_lock_mngr.release_lock(level); } } - + if (args->priority == ReconstructionPriority::FLUSH) { - extension->m_flush_in_progress.store(false); + /* advance the buffer head for a flush */ + args->version->advance_buffer_head(new_head); + + extension->m_lock_mngr.release_buffer_lock(); + fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id()); + extension->print_structure(); + fflush(stdout); } else { fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id()); extension->print_structure(); @@ -638,29 +636,18 @@ private: assert(new_version->get_structure()); assert(new_version->get_id() != INVALID_VERSION); - fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); + fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); + /* wait until our turn to install the new version */ await_version(new_version->get_id() - 1); - auto old = get_active_version(); - - new_version->merge_changes_from(old.get(), old_active_version_id); - new_version->update_shard_version(new_version->get_id()); - + auto lk = std::unique_lock(m_version_advance_mtx); /* * Only one version can have a given number, so we are safe to * directly assign here--nobody else is going to change it out from * under us. */ m_active_version.store(new_version); - - /* - * My understanding is that you don't *really* need this mutex for - * safety in modern C++ when sending the signal. But I'll grab it - * anyway out of an abundance of caution. I doubt this will be a - * major bottleneck. - */ - auto lk = std::unique_lock(m_version_advance_mtx); m_version_advance_cv.notify_all(); fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id()); @@ -687,14 +674,11 @@ private: void schedule_flush() { begin_reconstruction_scheduling(); - bool old = m_flush_in_progress.load(); - if (old || !m_flush_in_progress.compare_exchange_strong(old, true)) { + if (!m_lock_mngr.take_buffer_lock()) { end_reconstruction_scheduling(); - m_version_advance_cv.notify_all(); return; } - /* * for "legacy" policies, without background reconstruction, we need * a valid structure object as part of the version prior to determining diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h index 91ed778..fcc79d1 100644 --- a/include/framework/scheduling/LockManager.h +++ b/include/framework/scheduling/LockManager.h @@ -46,7 +46,21 @@ public: return false; } + bool take_buffer_lock() { + bool old = m_buffer_lk.load(); + if (!old) { + return m_buffer_lk.compare_exchange_strong(old, true); + } + + return false; + } + + void release_buffer_lock() { + m_buffer_lk.store(false); + } + private: std::deque> m_lks; + std::atomic m_buffer_lk; }; } diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index fa713af..521e68b 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -55,6 +55,8 @@ public: new_struct->m_levels.push_back(m_levels[i]->clone()); } + new_struct->m_deleted_shards = m_deleted_shards; + return new_struct; } @@ -159,65 +161,29 @@ public: return cnt; } + /* * Perform the reconstruction described by task. If the resulting * reconstruction grows the structure (i.e., adds a level), returns * true. Otherwise, returns false. */ - inline bool perform_reconstruction(ReconstructionTask task, size_t version=0) { - /* perform the reconstruction itself */ + inline reconstruction_results perform_reconstruction(ReconstructionTask task) const { + reconstruction_results result; + result.target_level = task.target; + std::vector shards; for (ShardID shid : task.sources) { - assert(shid.level_idx <= (level_index) m_levels.size()); + assert(shid.level_idx < (level_index) m_levels.size()); assert(shid.shard_idx >= -1); - if (shid.level_idx == (level_index) m_levels.size()) { - continue; - } - - /* if unspecified, push all shards into the vector */ - if (shid.shard_idx == all_shards_idx) { - for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); - i++) { - if (m_levels[shid.level_idx]->get_shard(i)) { - shards.push_back(m_levels[shid.level_idx]->get_shard(i)); - } - } - } else { - shards.push_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx)); - } + auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx); + shards.push_back(raw_shard_ptr); + result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr); } - auto new_shard = new ShardType(shards); + result.new_shard = std::make_shared(shards); - /* - * Remove all of the shards processed by the operation - */ - for (ShardID shid : task.sources) { - if (shid.level_idx == (level_index) m_levels.size()) { - continue; - } else if (shid.shard_idx == all_shards_idx) { - m_levels[shid.level_idx]->truncate(); - } else if (shid != buffer_shid) { - m_levels[shid.level_idx]->delete_shard(shid.shard_idx); - } - } - - // fprintf(stderr, "Target: %ld\tLevels:%ld\n", task.target, m_levels.size()); - - /* - * Append the new shard to the target level - */ - if (task.target < (level_index)m_levels.size()) { - m_levels[task.target]->append(std::shared_ptr(new_shard), version); - return false; - // fprintf(stderr, "append (no growth)\n"); - } else { /* grow the structure if needed */ - m_levels.push_back(std::make_shared>(task.target)); - m_levels[task.target]->append(std::shared_ptr(new_shard), version); - return true; - // fprintf(stderr, "grow and append\n"); - } + return result; } std::vector @@ -237,8 +203,36 @@ public: return m_levels[0]->get_shard_count(); } - void append_l0(std::shared_ptr shard, size_t version) { - m_levels[0]->append(shard, version); + bool append_shard(std::shared_ptr shard, size_t version, size_t level) { + assert(level <= m_levels.size()); + auto rc = false; + + if (level == m_levels.size()) { + /* grow the structure */ + m_levels.push_back(std::make_shared>(level)); + rc = true; + } + + m_levels[level]->append(shard, version); + + return rc; + } + + void delete_shards(std::vector> shards) { + for (size_t i=0; iget_shard_count(); j++) { + if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() == shards[i].second) { + shard_idx = j; + break; + } + } + + if (shard_idx != -1) { + m_levels[shards[i].first]->delete_shard(shard_idx); + } + } } LevelVector const &get_level_vector() const { return m_levels; } @@ -269,13 +263,17 @@ public: return ts_prop <= (long double) max_delete_prop; } - void print_structure() const { + void print_structure(bool debug=false) const { for (size_t i=0; iget_shard_count(); j++) { - fprintf(stdout, "(%ld: %ld) ", j, m_levels[i]->get_shard(j)->get_record_count()); + fprintf(stdout, "(%ld, %ld, %p: %ld) ", j, m_levels[i]->get_shard_ptr(j).second, m_levels[i]->get_shard_ptr(j).first.get(), m_levels[i]->get_shard(j)->get_record_count()); } } else { fprintf(stdout, "[Empty]"); @@ -285,37 +283,9 @@ public: } } - - void merge_structure(const ExtensionStructure* old_structure, size_t version_id = 0) { - assert(version_id > 0); - - for (size_t i=0; im_levels.size(); i++) { - if (m_levels.size() <= i) { - m_levels.push_back(old_structure->m_levels[i]); - } else { - for (size_t j=0; jm_levels[i]->get_shard_count(); j++) { - if (old_structure->m_levels[i]->get_shard_version(j) > version_id) { - m_levels[i]->append(old_structure->m_levels[i]->get_shard_ptr(j)); - } - } - } - } - } - - void update_shard_version(size_t version) { - assert(version != 0); - - for (size_t i=0; iget_shard_count(); j++) { - if (m_levels[i]->get_shard_version(j) == 0) { - m_levels[i]->set_shard_version(j, version); - } - } - } - } - private: LevelVector m_levels; + std::vector m_deleted_shards; }; } // namespace de diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index 37b2b40..7e8e87d 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -198,8 +198,12 @@ public: void truncate() { m_shards.erase(m_shards.begin(), m_shards.end()); } - void delete_shard(shard_index shard) { + void delete_shard(shard_index shard, bool log_delete=true) { + size_t before = m_shards.size(); + fprintf(stderr, "[D]\tReconstruction deleting shard %ld %p\n", shard, m_shards[shard].first.get()); m_shards.erase(m_shards.begin() + shard); + size_t after = m_shards.size(); + assert( before > after); } void append(std::shared_ptr shard, size_t version=0) { -- cgit v1.2.3