From d129ac81281022107ebccc5197ee126dd1b8cf2a Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 10 Apr 2025 13:57:04 -0400 Subject: DynamicExtension.h: Added the insertion rate limiter The exact determination of *how* to use this mechanism will be contigent on some of the math I'm working on at the moment. --- include/framework/DynamicExtension.h | 150 +++++++++++++++++++++++------------ 1 file changed, 98 insertions(+), 52 deletions(-) (limited to 'include/framework') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index a65498c..dd6917c 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -11,14 +11,15 @@ #include #include +#include #include -#include #include +#include #include "framework/interface/Scheduler.h" #include "framework/reconstruction/ReconstructionPolicy.h" -#include "framework/scheduling/SerialScheduler.h" #include "framework/scheduling/LockManager.h" +#include "framework/scheduling/SerialScheduler.h" #include "framework/scheduling/Task.h" #include "framework/structure/ExtensionStructure.h" @@ -83,16 +84,17 @@ public: * performing compactions and flushes, etc. */ DynamicExtension(ConfType &&config) : m_config(std::move(config)) { - m_buffer = - std::make_unique(m_config.buffer_flush_trigger, m_config.buffer_size); + m_buffer = std::make_unique(m_config.buffer_flush_trigger, + m_config.buffer_size); m_sched = std::make_unique(m_config.maximum_memory_usage, m_config.maximum_threads); - m_active_version.store( - std::make_shared(INITIAL_VERSION, std::make_unique(), m_buffer.get(), 0)); + m_active_version.store(std::make_shared( + INITIAL_VERSION, std::make_unique(), m_buffer.get(), 0)); m_version_counter = INITIAL_VERSION; m_preempt_version = INVALID_VERSION; + m_insertion_rate.store(1.0); assert(m_config.recon_policy); } @@ -122,7 +124,9 @@ public: * @return 1 on success, 0 on failure (in which case the insert should * be retried) */ - int insert(const RecordType &rec) { return internal_append(rec, false); } + int insert(const RecordType &rec, gsl_rng *rng = nullptr) { + return internal_append(rec, false, rng); + } /** * Erases a record from the index, according to the DeletePolicy @@ -380,13 +384,13 @@ public: * Writes a schematic view of the currently active structure to * stdout. Each level is on its own line, and each shard is represented. */ - void print_structure() { - auto ver = get_active_version(); - auto bv = ver->get_buffer(); + void print_structure() { + auto ver = get_active_version(); + auto bv = ver->get_buffer(); - fprintf(stdout, "[B]:\t(%ld)\n", bv.get_record_count()); - ver->get_structure()->print_structure(); - } + fprintf(stdout, "[B]:\t(%ld)\n", bv.get_record_count()); + ver->get_structure()->print_structure(); + } private: ConfType m_config; @@ -410,9 +414,13 @@ private: alignas(64) std::atomic m_scheduling_reconstruction; - bool restart_query(QueryArgs *args, size_t version) { + alignas(64) std::atomic m_insertion_rate; + + bool restart_query(QueryArgs *args, + size_t version) { if (version <= m_preempt_version.load()) { - // fprintf(stderr, "[I] Preempted query on version %ld, restarting...\n", version); + // fprintf(stderr, "[I] Preempted query on version %ld, restarting...\n", + // version); m_sched->schedule_job(async_query, 0, (void *)args, QUERY); return true; } @@ -422,9 +430,10 @@ private: void preempt_queries() { size_t vers = m_active_version.load()->get_id() - 1; - if (vers != m_preempt_version.load()){ + if (vers != m_preempt_version.load()) { m_preempt_version.store(vers); - // fprintf(stderr, "[I] Initiating query preemption on version %ld\n", vers); + // fprintf(stderr, "[I] Initiating query preemption on version %ld\n", + // vers); } } @@ -479,15 +488,17 @@ private: * this code will be bypassed in that case. */ if (args->priority == ReconstructionPriority::FLUSH) { - // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id); - // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n", args->version->get_id(), recon_id); + // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id); + // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n", + // args->version->get_id(), recon_id); /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); assert(buffview.get_tail() != buffview.get_head()); new_head = buffview.get_tail(); - // fprintf(stderr, "\t[I] Current Buffer Head:\t%ld (%ld)\n", buffview.get_head(), recon_id); + // fprintf(stderr, "\t[I] Current Buffer Head:\t%ld (%ld)\n", + // buffview.get_head(), recon_id); reconstruction_results flush_recon; flush_recon.target_level = 0; @@ -505,7 +516,9 @@ private: * shards in L0 to use), or use it (by using the "all_shards_idx" * shard id). */ - args->version->get_mutable_structure()->append_shard(flush_recon.new_shard, args->version->get_id(), flush_recon.target_level); + args->version->get_mutable_structure()->append_shard( + flush_recon.new_shard, args->version->get_id(), + flush_recon.target_level); /* advance the buffer head for a flush */ bool success = false; @@ -515,28 +528,33 @@ private: if (!success) { failure_cnt++; usleep(1); - // fprintf(stderr, "\t[I] Buffer head advance blocked on %ld (%ld)\n", args->version->get_id(), recon_id); + // fprintf(stderr, "\t[I] Buffer head advance blocked on %ld (%ld)\n", + // args->version->get_id(), recon_id); if (failure_cnt >= extension->m_config.buffer_flush_query_preemption_trigger) { extension->preempt_queries(); if (failure_cnt > 500000) { - // fprintf(stderr, "[C] Critical failure. Hung on version: %ld (%ld)\n", extension->m_buffer->debug_get_old_head(), recon_id); + // fprintf(stderr, "[C] Critical failure. Hung on version: %ld + // (%ld)\n", extension->m_buffer->debug_get_old_head(), recon_id); } } } } - // fprintf(stderr, "\t[I] Buffer head advanced to:\t%ld (%ld)\n", new_head, recon_id); + // fprintf(stderr, "\t[I] Buffer head advanced to:\t%ld (%ld)\n", + // new_head, recon_id); } else { - // fprintf(stderr, "[I] Running background reconstruction (%ld)\n", recon_id); + // fprintf(stderr, "[I] Running background reconstruction (%ld)\n", + // recon_id); } /* perform all of the reconstructions */ auto structure = args->version->get_structure(); assert(structure); - // fprintf(stderr, "\t[I] Pre-reconstruction L0 Size\t%ld (%ld)\n", structure->get_level_vector()[0]->get_shard_count(), recon_id); + // fprintf(stderr, "\t[I] Pre-reconstruction L0 Size\t%ld (%ld)\n", + // structure->get_level_vector()[0]->get_shard_count(), recon_id); for (size_t i = 0; i < args->tasks.size(); i++) { reconstructions.emplace_back( @@ -579,7 +597,8 @@ private: for (auto recon : reconstructions) { auto grow = args->version->get_mutable_structure()->append_shard( recon.new_shard, args->version->get_id(), recon.target_level); - args->version->get_mutable_structure()->delete_shards(recon.source_shards); + args->version->get_mutable_structure()->delete_shards( + recon.source_shards); if (grow) { extension->m_lock_mngr.add_lock(); } @@ -587,20 +606,24 @@ private: size_t new_reccnt = args->version->get_structure()->get_record_count(); - // fprintf(stderr, "\t[I] Post-reconstruction L0 Size\t%ld (%ld)\n", args->version->get_structure()->get_level_vector()[0]->get_shard_count(), recon_id); + // fprintf(stderr, "\t[I] Post-reconstruction L0 Size\t%ld (%ld)\n", + // args->version->get_structure()->get_level_vector()[0]->get_shard_count(), + // recon_id); /* for maintenance reconstructions, advance the buffer head to match the * currently active version */ if (args->priority == ReconstructionPriority::MAINT) { args->version->set_buffer(extension->m_buffer.get(), active_version->get_head()); - // fprintf(stderr, "\t[I] Buffer head set to %ld (%ld)\n", active_version->get_head(), recon_id); + // fprintf(stderr, "\t[I] Buffer head set to %ld (%ld)\n", + // active_version->get_head(), recon_id); if (new_reccnt != cur_reccnt) { fprintf(stderr, "ERROR: invalid reccnt (%ld)\n", recon_id); } } - // fprintf(stderr, "\t[I] Record Counts: %ld %ld %ld (%ld)\n", old_reccnt, cur_reccnt, new_reccnt, recon_id); + // fprintf(stderr, "\t[I] Record Counts: %ld %ld %ld (%ld)\n", old_reccnt, + // cur_reccnt, new_reccnt, recon_id); /* advance the index to the newly finished version */ extension->install_new_version(args->version, args->initial_version); @@ -615,7 +638,8 @@ private: } for (auto level : locked_levels) { - // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, recon_id); + // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, + // recon_id); extension->m_lock_mngr.release_lock(level, args->version->get_id()); } } @@ -625,7 +649,8 @@ private: extension->m_lock_mngr.release_buffer_lock(); } - // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n", args->version->get_id(), recon_id); + // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n", + // args->version->get_id(), recon_id); /* manually delete the argument object */ delete args; @@ -717,8 +742,9 @@ private: size_t version_id = m_version_counter.fetch_add(1) + 1; // fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id); auto active_version = get_active_version(); - std::shared_ptr new_version = - std::make_shared(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head()); + std::shared_ptr new_version = std::make_shared( + version_id, std::move(structure), m_buffer.get(), + active_version->get_buffer().get_head()); return new_version; } @@ -738,7 +764,8 @@ private: return new_version; } - void install_new_version(version_ptr new_version, size_t old_active_version_id) { + void install_new_version(version_ptr new_version, + size_t old_active_version_id) { assert(new_version->valid()); // fprintf(stderr, "\t[I] Installing version %ld\n", new_version->get_id()); @@ -754,7 +781,7 @@ private: */ m_active_version.store(new_version); m_version_advance_cv.notify_all(); - + // fprintf(stderr, "\t[I] Installed version %ld\n", new_version->get_id()); } @@ -781,13 +808,13 @@ private: } /* - * Double check that we actually need to flush. I was seeing some - * flushes running on empty buffers--somehow it seems like a new - * flush was getting scheduled immediately after another one finished, - * when it wasn't necessary. This should prevent that from happening. - * - * A bit of a kludge, but it *should* work - */ + * Double check that we actually need to flush. I was seeing some + * flushes running on empty buffers--somehow it seems like a new + * flush was getting scheduled immediately after another one finished, + * when it wasn't necessary. This should prevent that from happening. + * + * A bit of a kludge, but it *should* work + */ if (!m_buffer->is_at_low_watermark()) { m_lock_mngr.release_buffer_lock(); end_reconstruction_scheduling(); @@ -797,7 +824,8 @@ private: auto active_version = m_active_version.load(); auto *args = new ReconstructionArgs(); - args->version = create_version_flush(std::unique_ptr(active_version->get_structure()->copy())); + args->version = create_version_flush(std::unique_ptr( + active_version->get_structure()->copy())); args->tasks = m_config.recon_policy->get_flush_tasks(active_version.get()); args->extension = this; args->priority = ReconstructionPriority::FLUSH; @@ -810,8 +838,9 @@ private: * freed here */ m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, - FLUSH); - // fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", args->version->get_id()); + FLUSH); + // fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", + // args->version->get_id()); if (m_config.recon_enable_maint_on_flush) { schedule_maint_reconstruction(false); @@ -833,7 +862,8 @@ private: // fprintf(stderr, "[I] Scheduling maintenance\n"); auto active_version = m_active_version.load(); - auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr); + auto reconstructions = m_config.recon_policy->get_reconstruction_tasks( + active_version.get(), m_lock_mngr); for (auto &recon : reconstructions) { /* @@ -841,13 +871,14 @@ private: * freed here */ auto *args = new ReconstructionArgs(); - args->version = create_version_maint(std::unique_ptr(active_version->get_structure()->copy())); - args->tasks = std::move(recon); + args->version = create_version_maint(std::unique_ptr( + active_version->get_structure()->copy())); + args->tasks = std::move(recon); args->extension = this; args->priority = ReconstructionPriority::MAINT; args->initial_version = active_version->get_id(); - m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), args, - RECONSTRUCTION); + m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), + args, RECONSTRUCTION); } if (take_reconstruction_lock) { @@ -868,11 +899,26 @@ private: return result; } - int internal_append(const RecordType &rec, bool ts) { + int internal_append(const RecordType &rec, bool ts, gsl_rng *rng = nullptr) { if (m_buffer->is_at_low_watermark() && !m_lock_mngr.is_buffer_locked()) { schedule_flush(); } + /* + * Insertion rate limiting is handled by Bernoulli Sampling to determine + * whether a given insert succeeds or not. If an insert is allowed to + * happen, a buffer append will be attempted (this might still fail, + * though). Otherwise, the insert will not be attempted and should + * be retried. If the insertion rate is unity, then no probabilistic + * filtering will occur. + */ + if (m_insertion_rate.load() < 1 && rng) { + auto p = gsl_rng_uniform(rng); + if (p > m_insertion_rate.load()) { + return false; + } + } + /* this will fail if the HWM is reached and return 0 */ return m_buffer->append(rec, ts); } -- cgit v1.2.3