diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-09-17 19:18:44 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-09-17 19:18:44 -0400 |
| commit | 27b239cd36a68350f1bcb2c55b9e1632e6f26ee3 (patch) | |
| tree | 1d951c315d5a1eaa437f97b866405c14b3cd00b7 /include/framework/DynamicExtension.h | |
| parent | 42cb6e2b446a2879cf9bf2f4642f926c15584cb3 (diff) | |
| download | dynamic-extension-27b239cd36a68350f1bcb2c55b9e1632e6f26ee3.tar.gz | |
Implemented dynamic insertion rate adjustment
The insertion rate will now be updated on the fly
to block inserts based on the sum of the predicted
runtime and record counts for currently active
reconstructions.
The scheduling system now also calls into the
runtime prediction system and handles this data
appropriately.
All that remains is to implement the prediction
models themselves--then we should be good to test.
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 42 |
1 files changed, 42 insertions, 0 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 5917475..d8c160c 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -421,6 +421,36 @@ private: alignas(64) std::atomic<double> m_insertion_rate; + alignas(64) std::atomic<long> m_scheduled_reconstruction_time; + + alignas(64) std::atomic<long> m_scheduled_records; + + enum reconstruction_phase { + RECON_SCHEDULED = 1, + RECON_ENDED = -1 + }; + + + + void update_stall_probability(long runtime, size_t reccnt, reconstruction_phase phase) { + + m_scheduled_reconstruction_time.fetch_add(runtime * phase); + m_scheduled_records.fetch_add(runtime * phase); + + long time_per_record = m_scheduled_reconstruction_time.load() / m_scheduled_records.load(); + size_t records_to_1us = 1e6 / time_per_record; + + double rate = 1.0 / (double) records_to_1us; + + if (rate < 1) { + m_insertion_rate.store(1 - rate); + } else { + // FIXME: the code is *not* set up to handle this situation + m_insertion_rate.store(1); + fprintf(stderr, "Warning: greater than 1us per record stall time needed.\n"); + } + } + bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args, size_t version) { if (version <= m_preempt_version.load()) { @@ -658,7 +688,11 @@ private: // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n", // args->version->get_id(), recon_id); + // + extension->update_stall_probability(args->predicted_runtime, + args->tasks.get_total_reccnt(), + RECON_ENDED); /* manually delete the argument object */ delete args; @@ -886,6 +920,14 @@ private: args->extension = this; args->priority = ReconstructionPriority::MAINT; args->initial_version = active_version->get_id(); + + for (size_t i=0; i< recon.size(); i++) { + args->predicted_runtime += active_version->get_structure()->get_level_vector()[recon[i].target]->predict_reconstruction_time(recon[i].reccnt); + } + + update_stall_probability(args->predicted_runtime, + args->tasks.get_total_reccnt(), + RECON_SCHEDULED); m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), args, RECONSTRUCTION); } |