From 27b239cd36a68350f1bcb2c55b9e1632e6f26ee3 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 17 Sep 2025 19:18:44 -0400 Subject: Implemented dynamic insertion rate adjustment The insertion rate will now be updated on the fly to block inserts based on the sum of the predicted runtime and record counts for currently active reconstructions. The scheduling system now also calls into the runtime prediction system and handles this data appropriately. All that remains is to implement the prediction models themselves--then we should be good to test. --- include/framework/DynamicExtension.h | 42 ++++++++++++++++++++++++++++++++++++ include/framework/scheduling/Task.h | 1 + 2 files changed, 43 insertions(+) (limited to 'include/framework') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 5917475..d8c160c 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -421,6 +421,36 @@ private: alignas(64) std::atomic m_insertion_rate; + alignas(64) std::atomic m_scheduled_reconstruction_time; + + alignas(64) std::atomic m_scheduled_records; + + enum reconstruction_phase { + RECON_SCHEDULED = 1, + RECON_ENDED = -1 + }; + + + + void update_stall_probability(long runtime, size_t reccnt, reconstruction_phase phase) { + + m_scheduled_reconstruction_time.fetch_add(runtime * phase); + m_scheduled_records.fetch_add(runtime * phase); + + long time_per_record = m_scheduled_reconstruction_time.load() / m_scheduled_records.load(); + size_t records_to_1us = 1e6 / time_per_record; + + double rate = 1.0 / (double) records_to_1us; + + if (rate < 1) { + m_insertion_rate.store(1 - rate); + } else { + // FIXME: the code is *not* set up to handle this situation + m_insertion_rate.store(1); + fprintf(stderr, "Warning: greater than 1us per record stall time needed.\n"); + } + } + bool restart_query(QueryArgs *args, size_t version) { if (version <= m_preempt_version.load()) { @@ -658,7 +688,11 @@ private: // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n", // args->version->get_id(), recon_id); + // + extension->update_stall_probability(args->predicted_runtime, + args->tasks.get_total_reccnt(), + RECON_ENDED); /* manually delete the argument object */ delete args; @@ -886,6 +920,14 @@ private: args->extension = this; args->priority = ReconstructionPriority::MAINT; args->initial_version = active_version->get_id(); + + for (size_t i=0; i< recon.size(); i++) { + args->predicted_runtime += active_version->get_structure()->get_level_vector()[recon[i].target]->predict_reconstruction_time(recon[i].reccnt); + } + + update_stall_probability(args->predicted_runtime, + args->tasks.get_total_reccnt(), + RECON_SCHEDULED); m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), args, RECONSTRUCTION); } diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index b2884c7..4529b2e 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -38,6 +38,7 @@ struct ReconstructionArgs { void *extension; ReconstructionPriority priority; size_t initial_version; + long predicted_runtime; }; template Q, typename DE> struct QueryArgs { -- cgit v1.2.3