diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-09-23 19:22:15 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-09-23 19:22:15 -0400 |
| commit | 826c1fff5accbaa6b415acc176a5acbeb5f691b6 (patch) | |
| tree | 460a1906aff5ad0d800b232c2c8613960535b101 /include/framework/DynamicExtension.h | |
| parent | 27b239cd36a68350f1bcb2c55b9e1632e6f26ee3 (diff) | |
| download | dynamic-extension-826c1fff5accbaa6b415acc176a5acbeb5f691b6.tar.gz | |
Insertion Stall Updates
Revised insertion stalling mechanism to work for
structures with > 1us required stall time per
record, and got dynamic calculation of the stall
time working.
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 76 |
1 files changed, 52 insertions, 24 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index d8c160c..6cfe72b 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -420,6 +420,7 @@ private: alignas(64) std::atomic<bool> m_scheduling_reconstruction; alignas(64) std::atomic<double> m_insertion_rate; + alignas(64) std::atomic<long> m_stall_us; alignas(64) std::atomic<long> m_scheduled_reconstruction_time; @@ -430,24 +431,37 @@ private: RECON_ENDED = -1 }; - - void update_stall_probability(long runtime, size_t reccnt, reconstruction_phase phase) { + void update_insertion_delay(long runtime, size_t reccnt, reconstruction_phase phase) { + + if (!m_config.dynamic_ratelimiting) { + return; + } m_scheduled_reconstruction_time.fetch_add(runtime * phase); - m_scheduled_records.fetch_add(runtime * phase); + m_scheduled_records.fetch_add(reccnt * phase); - long time_per_record = m_scheduled_reconstruction_time.load() / m_scheduled_records.load(); - size_t records_to_1us = 1e6 / time_per_record; + long time_per_record = (m_scheduled_records.load()) + ? m_scheduled_reconstruction_time.load() / + m_scheduled_records.load() + : 0; - double rate = 1.0 / (double) records_to_1us; + long us = time_per_record / 1000; + long excess_ns = time_per_record - us*1000; - if (rate < 1) { - m_insertion_rate.store(1 - rate); - } else { - // FIXME: the code is *not* set up to handle this situation - m_insertion_rate.store(1); - fprintf(stderr, "Warning: greater than 1us per record stall time needed.\n"); + m_stall_us.store(us); + + size_t records_to_1us = (excess_ns) ? 1000 / excess_ns : 0; + m_insertion_rate.store(1.0 - 1.0 / (double) records_to_1us ); + + fprintf(stderr, "[%d] Updated delay: %ld and %ld\n", phase, us, excess_ns); + + if (runtime == 0) { + fprintf(stderr, "\t[W] Predicted runtime change of 0\n"); + } + + if (reccnt == 0) { + fprintf(stderr, "\t[W] Predicted reccnt change of 0\n"); } } @@ -679,6 +693,10 @@ private: // recon_id); extension->m_lock_mngr.release_lock(level, args->version->get_id()); } + + extension->update_insertion_delay(args->predicted_runtime, + args->tasks.get_total_reccnt(), + RECON_ENDED); } if (args->priority == ReconstructionPriority::FLUSH) { @@ -690,9 +708,6 @@ private: // args->version->get_id(), recon_id); // - extension->update_stall_probability(args->predicted_runtime, - args->tasks.get_total_reccnt(), - RECON_ENDED); /* manually delete the argument object */ delete args; @@ -921,11 +936,22 @@ private: args->priority = ReconstructionPriority::MAINT; args->initial_version = active_version->get_id(); + size_t level_idx = 0; for (size_t i=0; i< recon.size(); i++) { - args->predicted_runtime += active_version->get_structure()->get_level_vector()[recon[i].target]->predict_reconstruction_time(recon[i].reccnt); + if (recon[i].target < (level_index) active_version->get_structure()->get_level_vector().size()) { + level_idx = recon[i].target; + args->predicted_runtime += active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt); + } else { + level_idx = recon[i].target - 1; + args->predicted_runtime += m_config.rt_level_scale * active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt); + } } - update_stall_probability(args->predicted_runtime, + if (args->predicted_runtime == 0) { + fprintf(stderr, "Runtime Prediction of 0 for Level %ld with %ld records\n", level_idx, args->tasks.get_total_reccnt()); + } + + update_insertion_delay(args->predicted_runtime, args->tasks.get_total_reccnt(), RECON_SCHEDULED); m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), @@ -956,17 +982,19 @@ private: } /* - * Insertion rate limiting is handled by Bernoulli Sampling to determine - * whether a given insert succeeds or not. If an insert is allowed to - * happen, a buffer append will be attempted (this might still fail, - * though). Otherwise, the insert will not be attempted and should - * be retried. If the insertion rate is unity, then no probabilistic - * filtering will occur. + * + */ + if (m_stall_us.load() > 0) { + usleep(m_stall_us.load()); + } + + /* + * */ if (m_insertion_rate.load() < 1 && rng) { auto p = gsl_rng_uniform(rng); if (p > m_insertion_rate.load()) { - return false; + usleep(1); } } |