summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-09-17 19:18:44 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2025-09-17 19:18:44 -0400
commit27b239cd36a68350f1bcb2c55b9e1632e6f26ee3 (patch)
tree1d951c315d5a1eaa437f97b866405c14b3cd00b7
parent42cb6e2b446a2879cf9bf2f4642f926c15584cb3 (diff)
downloaddynamic-extension-27b239cd36a68350f1bcb2c55b9e1632e6f26ee3.tar.gz
Implemented dynamic insertion rate adjustment
The insertion rate will now be updated on the fly to block inserts based on the sum of the predicted runtime and record counts for currently active reconstructions. The scheduling system now also calls into the runtime prediction system and handles this data appropriately. All that remains is to implement the prediction models themselves--then we should be good to test.
-rw-r--r--include/framework/DynamicExtension.h42
-rw-r--r--include/framework/scheduling/Task.h1
2 files changed, 43 insertions, 0 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 5917475..d8c160c 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -421,6 +421,36 @@ private:
alignas(64) std::atomic<double> m_insertion_rate;
+ alignas(64) std::atomic<long> m_scheduled_reconstruction_time;
+
+ alignas(64) std::atomic<long> m_scheduled_records;
+
+ enum reconstruction_phase {
+ RECON_SCHEDULED = 1,
+ RECON_ENDED = -1
+ };
+
+
+
+ void update_stall_probability(long runtime, size_t reccnt, reconstruction_phase phase) {
+
+ m_scheduled_reconstruction_time.fetch_add(runtime * phase);
+ m_scheduled_records.fetch_add(runtime * phase);
+
+ long time_per_record = m_scheduled_reconstruction_time.load() / m_scheduled_records.load();
+ size_t records_to_1us = 1e6 / time_per_record;
+
+ double rate = 1.0 / (double) records_to_1us;
+
+ if (rate < 1) {
+ m_insertion_rate.store(1 - rate);
+ } else {
+ // FIXME: the code is *not* set up to handle this situation
+ m_insertion_rate.store(1);
+ fprintf(stderr, "Warning: greater than 1us per record stall time needed.\n");
+ }
+ }
+
bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args,
size_t version) {
if (version <= m_preempt_version.load()) {
@@ -658,7 +688,11 @@ private:
// fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n",
// args->version->get_id(), recon_id);
+ //
+ extension->update_stall_probability(args->predicted_runtime,
+ args->tasks.get_total_reccnt(),
+ RECON_ENDED);
/* manually delete the argument object */
delete args;
@@ -886,6 +920,14 @@ private:
args->extension = this;
args->priority = ReconstructionPriority::MAINT;
args->initial_version = active_version->get_id();
+
+ for (size_t i=0; i< recon.size(); i++) {
+ args->predicted_runtime += active_version->get_structure()->get_level_vector()[recon[i].target]->predict_reconstruction_time(recon[i].reccnt);
+ }
+
+ update_stall_probability(args->predicted_runtime,
+ args->tasks.get_total_reccnt(),
+ RECON_SCHEDULED);
m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(),
args, RECONSTRUCTION);
}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index b2884c7..4529b2e 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -38,6 +38,7 @@ struct ReconstructionArgs {
void *extension;
ReconstructionPriority priority;
size_t initial_version;
+ long predicted_runtime;
};
template <ShardInterface S, QueryInterface<S> Q, typename DE> struct QueryArgs {