From 2ded45f5a20f38fdfd9f348c446c38dc713a5591 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 3 Mar 2025 13:41:19 -0500 Subject: Fixed a few concurrency bugs --- include/framework/DynamicExtension.h | 204 +++++++++++++++++++++++------------ 1 file changed, 133 insertions(+), 71 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 8cef4a1..762029e 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -1,7 +1,7 @@ /* * include/framework/DynamicExtension.h * - * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Copyright (C) 2023-2025 Douglas B. Rumbaugh * Dong Xie * * Distributed under the Modified BSD License. @@ -161,7 +161,6 @@ public: static_assert(std::same_as, "Tagging is only supported in single-threaded operation"); - auto view = m_buffer->get_buffer_view(); auto version = get_active_version(); if (version->get_mutable_structure()->tagged_delete(rec)) { return 1; @@ -172,7 +171,7 @@ public: * probably has the lowest probability of having the record, * so we'll check it last. */ - return view.delete_record(rec); + return version->get_buffer().delete_record(rec); } /* @@ -256,9 +255,8 @@ public: * ShardType::get_memory_usage) and the buffer by the index. */ size_t get_memory_usage() { - auto version = get_active_version(); auto t = m_buffer->get_memory_usage() + - version->get_structure()->get_memory_usage(); + get_active_version()->get_structure()->get_memory_usage(); return t; } @@ -288,8 +286,7 @@ public: * the index. Ownership of this object is transfered to the * caller. */ - // FIXME: switch this over to std::unique_ptr - ShardType * + std::unique_ptr create_static_structure(bool await_reconstruction_completion = false) { if (await_reconstruction_completion) { await_version(); @@ -322,7 +319,7 @@ public: } } - ShardType *flattened = new ShardType(shards); + auto flattened = std::make_unique(shards); for (auto shard : shards) { delete shard; @@ -348,7 +345,7 @@ public: * of the counter */ if (vid == INVALID_VERSION) { - vid = m_version_counter.load() - 1; + vid = m_version_counter.load(); } /* versions signal on m_version_advance_cv when they activate */ @@ -384,7 +381,11 @@ public: * stdout. Each level is on its own line, and each shard is represented. */ void print_structure() { - get_active_version()->get_structure()->print_structure(); + auto ver = get_active_version(); + auto bv = ver->get_buffer(); + + fprintf(stdout, "[B]:\t(%ld)\n", bv.get_record_count()); + ver->get_structure()->print_structure(); } private: @@ -397,15 +398,15 @@ private: std::atomic m_next_core; /* versioning + concurrency variables */ - std::atomic m_version_counter; - std::atomic> m_active_version; + alignas(64) std::atomic m_version_counter; + alignas(64) std::atomic> m_active_version; - std::condition_variable m_version_advance_cv; - std::mutex m_version_advance_mtx; + alignas(64) std::condition_variable m_version_advance_cv; + alignas(64) std::mutex m_version_advance_mtx; - LockManager m_lock_mngr; + alignas(64) LockManager m_lock_mngr; - std::atomic m_preempt_version; + alignas(64) std::atomic m_preempt_version; alignas(64) std::atomic m_scheduling_reconstruction; @@ -460,7 +461,9 @@ private: auto extension = (DynamicExtension *)args->extension; extension->SetThreadAffinity(); - + // static std::atomic cnt = 0; + // size_t recon_id = cnt.fetch_add(1); + size_t new_head = 0; std::vector> reconstructions; @@ -476,21 +479,24 @@ private: * this code will be bypassed in that case. */ if (args->priority == ReconstructionPriority::FLUSH) { - // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id()); + // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id); + // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n", + // args->version->get_id(), recon_id); /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); assert(buffview.get_tail() != buffview.get_head()); new_head = buffview.get_tail(); + // fprintf(stderr, "\t[I] Current Buffer Head:\t%ld (%ld)\n", + // buffview.get_head(), recon_id); + reconstruction_results flush_recon; flush_recon.target_level = 0; flush_recon.new_shard = std::make_shared(std::move(buffview)); reconstructions.push_back(flush_recon); - // fprintf(stderr, "[I] Buffer Head:\t%ld\n", new_head); - /* advance the buffer head for a flush */ bool success = false; size_t failure_cnt = 0; @@ -499,24 +505,39 @@ private: if (!success) { failure_cnt++; usleep(1); - // fprintf(stderr, "[I] Buffer head advance blocked on %ld\n", args->version->get_id()); + // fprintf(stderr, "\t[I] Buffer head advance blocked on %ld (%ld)\n", + // args->version->get_id(), recon_id); - if (failure_cnt >= extension->m_config.buffer_flush_query_preemption_trigger) { + if (failure_cnt >= + extension->m_config.buffer_flush_query_preemption_trigger) { extension->preempt_queries(); + + if (failure_cnt > 500000) { + // fprintf(stderr, + // "[C] Critical failure. Hung on version: %ld (%ld)\n", + // extension->m_buffer->debug_get_old_head(), recon_id); + } } } } + // fprintf(stderr, "\t[I] Buffer head advanced to:\t%ld (%ld)\n", + // new_head, recon_id); } else { - // fprintf(stderr, "[I] Running background reconstruction\n"); + // fprintf(stderr, "[I] Running background reconstruction (%ld)\n", + // recon_id); } /* perform all of the reconstructions */ auto structure = args->version->get_structure(); assert(structure); - + + // fprintf(stderr, "\t[I] Pre-reconstruction L0 Size\t%ld (%ld)\n", + // structure->get_level_vector()[0]->get_shard_count(), recon_id); + for (size_t i = 0; i < args->tasks.size(); i++) { - reconstructions.emplace_back(structure->perform_reconstruction(args->tasks[i])); + reconstructions.emplace_back( + structure->perform_reconstruction(args->tasks[i])); } /* @@ -524,51 +545,90 @@ private: * reconstruction isn't a flush), generate one. */ if (args->version->get_id() == INVALID_VERSION) { - args->version->set_id(extension->m_version_counter.fetch_add(1)); - // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id()); + assert(args->priority == ReconstructionPriority::MAINT); + args->version->set_id(extension->m_version_counter.fetch_add(1) + 1); + // fprintf(stderr, "\t[I] Reconstruction version assigned %ld (%ld)\n", + // args->version->get_id(), recon_id); + } else { + assert(args->priority == ReconstructionPriority::FLUSH); } - + /* wait for our opportunity to install the updates */ extension->await_version(args->version->get_id() - 1); - /* get a fresh copy of the structure and apply our updates */ - args->version->set_structure(std::move(std::unique_ptr(extension->get_active_version()->get_structure()->copy()))); + // size_t old_reccnt = args->version->get_structure()->get_record_count(); + + /* + * this version *should* have an ID one less than the version we are + * currently constructing, and should be fully stable (i.e., only the + * buffer tail can change) + */ + auto active_version = extension->get_active_version(); + assert(active_version->get_id() == args->version->get_id() - 1); + + /* get a fresh copy of the structure from the current version */ + args->version->set_structure(std::move(std::unique_ptr( + active_version->get_structure()->copy()))); + + // size_t cur_reccnt = args->version->get_structure()->get_record_count(); + /* apply our updates to the copied structure (adding/removing shards) */ for (auto recon : reconstructions) { - auto grow = args->version->get_mutable_structure()->append_shard(recon.new_shard, args->version->get_id(), recon.target_level); + auto grow = args->version->get_mutable_structure()->append_shard( + recon.new_shard, args->version->get_id(), recon.target_level); args->version->get_mutable_structure()->delete_shards(recon.source_shards); if (grow) { extension->m_lock_mngr.add_lock(); } } - /* grab the latest buffer head */ + // size_t new_reccnt = args->version->get_structure()->get_record_count(); + + // fprintf(stderr, "\t[I] Post-reconstruction L0 Size\t%ld (%ld)\n", + // args->version->get_structure()->get_level_vector()[0]->get_shard_count(), + // recon_id); + + /* for maintenance reconstructions, advance the buffer head to match the + * currently active version */ if (args->priority == ReconstructionPriority::MAINT) { - args->version->set_head(extension->get_active_version()->get_head()); + args->version->set_buffer(extension->m_buffer.get(), + active_version->get_head()); + // fprintf(stderr, "\t[I] Buffer head set to %ld (%ld)\n", + // active_version->get_head(), recon_id); + // if (new_reccnt != cur_reccnt) { + // fprintf(stderr, "ERROR: invalid reccnt (%ld)\n", recon_id); + // } } + // fprintf(stderr, "\t[I] Record Counts: %ld %ld %ld (%ld)\n", old_reccnt, + // cur_reccnt, new_reccnt, recon_id); + /* advance the index to the newly finished version */ extension->install_new_version(args->version, args->initial_version); /* maint reconstructions can now safely release their locks */ if (args->priority == ReconstructionPriority::MAINT) { std::set locked_levels; - for (size_t i=0; itasks.size(); i++) { + for (size_t i = 0; i < args->tasks.size(); i++) { for (auto source : args->tasks[i].sources) { locked_levels.insert(source.level_idx); } } for (auto level : locked_levels) { - // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, args->version->get_id()); - extension->m_lock_mngr.release_lock(level); + // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, recon_id); + extension->m_lock_mngr.release_lock(level, args->version->get_id()); } } - + if (args->priority == ReconstructionPriority::FLUSH) { + // fprintf(stderr, "\t[I] releasing lock on buffer (%ld)\n", recon_id); extension->m_lock_mngr.release_buffer_lock(); } + // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n", + // args->version->get_id(), recon_id); + /* manually delete the argument object */ delete args; } @@ -596,6 +656,19 @@ private: /* execute the local/buffer queries and combine the results into output */ QueryResult output; do { + /* + * for query preemption--check if the query should be restarted + * to prevent blocking buffer flushes for too long + */ + if (args->extension->restart_query(args, version->get_id())) { + /* clean up memory allocated for temporary query objects */ + delete buffer_query; + for (size_t i = 0; i < local_queries.size(); i++) { + delete local_queries[i]; + } + return; + } + std::vector query_results(shards.size() + 1); for (size_t i = 0; i < query_results.size(); i++) { if (i == 0) { /* execute buffer query */ @@ -610,19 +683,6 @@ private: if (query_results[i].size() > 0) break; } - - /* - * for query preemption--check if the query should be restarted - * to prevent blocking buffer flushes for too long - */ - if (args->extension->restart_query(args, version->get_id())) { - /* clean up memory allocated for temporary query objects */ - delete buffer_query; - for (size_t i = 0; i < local_queries.size(); i++) { - delete local_queries[i]; - } - return; - } } /* @@ -656,7 +716,7 @@ private: * early to minimize activation blocking. */ version_ptr create_version_flush(std::unique_ptr structure) { - size_t version_id = m_version_counter.fetch_add(1); + size_t version_id = m_version_counter.fetch_add(1) + 1; // fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id); auto active_version = get_active_version(); std::shared_ptr new_version = @@ -674,17 +734,16 @@ private: */ version_ptr create_version_maint(std::unique_ptr structure) { auto active_version = get_active_version(); - version_ptr new_version = - std::make_shared(INVALID_VERSION, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head()); + version_ptr new_version = std::make_shared( + INVALID_VERSION, std::move(structure), nullptr, 0); return new_version; } void install_new_version(version_ptr new_version, size_t old_active_version_id) { - assert(new_version->get_structure()); - assert(new_version->get_id() != INVALID_VERSION); + assert(new_version->valid()); - // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); + // fprintf(stderr, "\t[I] Installing version %ld\n", new_version->get_id()); /* wait until our turn to install the new version */ await_version(new_version->get_id() - 1); @@ -698,11 +757,7 @@ private: m_active_version.store(new_version); m_version_advance_cv.notify_all(); - // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id()); - } - - StructureType *create_scratch_structure() { - return get_active_version()->get_structure()->copy(); + // fprintf(stderr, "\t[I] Installed version %ld\n", new_version->get_id()); } void begin_reconstruction_scheduling() { @@ -727,6 +782,20 @@ private: return; } + /* + * Double check that we actually need to flush. I was seeing some + * flushes running on empty buffers--somehow it seems like a new + * flush was getting scheduled immediately after another one finished, + * when it wasn't necessary. This should prevent that from happening. + * + * A bit of a kludge, but it *should* work + */ + if (!m_buffer->is_at_low_watermark()) { + m_lock_mngr.release_buffer_lock(); + end_reconstruction_scheduling(); + return; + } + auto active_version = m_active_version.load(); auto *args = new ReconstructionArgs(); @@ -768,10 +837,6 @@ private: auto active_version = m_active_version.load(); auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr); - if (reconstructions.size() == 0) { - // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n"); - } - for (auto &recon : reconstructions) { /* * NOTE: args is deleted by the reconstruction job, so shouldn't be @@ -783,7 +848,7 @@ private: args->extension = this; args->priority = ReconstructionPriority::MAINT; args->initial_version = active_version->get_id(); - m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, + m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), args, RECONSTRUCTION); } @@ -806,11 +871,8 @@ private: } int internal_append(const RecordType &rec, bool ts) { - if (m_buffer->is_at_low_watermark()) { - auto old = false; - if (m_scheduling_reconstruction.compare_exchange_strong(old, true)) { - schedule_flush(); - } + if (m_buffer->is_at_low_watermark() && !m_lock_mngr.is_buffer_locked()) { + schedule_flush(); } /* this will fail if the HWM is reached and return 0 */ -- cgit v1.2.3