diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-09-23 19:22:15 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-09-23 19:22:15 -0400 |
| commit | 826c1fff5accbaa6b415acc176a5acbeb5f691b6 (patch) | |
| tree | 460a1906aff5ad0d800b232c2c8613960535b101 /include/framework | |
| parent | 27b239cd36a68350f1bcb2c55b9e1632e6f26ee3 (diff) | |
| download | dynamic-extension-826c1fff5accbaa6b415acc176a5acbeb5f691b6.tar.gz | |
Insertion Stall Updates
Revised insertion stalling mechanism to work for
structures with > 1us required stall time per
record, and got dynamic calculation of the stall
time working.
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 76 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 29 | ||||
| -rw-r--r-- | include/framework/util/Configuration.h | 2 |
3 files changed, 80 insertions, 27 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index d8c160c..6cfe72b 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -420,6 +420,7 @@ private: alignas(64) std::atomic<bool> m_scheduling_reconstruction; alignas(64) std::atomic<double> m_insertion_rate; + alignas(64) std::atomic<long> m_stall_us; alignas(64) std::atomic<long> m_scheduled_reconstruction_time; @@ -430,24 +431,37 @@ private: RECON_ENDED = -1 }; - - void update_stall_probability(long runtime, size_t reccnt, reconstruction_phase phase) { + void update_insertion_delay(long runtime, size_t reccnt, reconstruction_phase phase) { + + if (!m_config.dynamic_ratelimiting) { + return; + } m_scheduled_reconstruction_time.fetch_add(runtime * phase); - m_scheduled_records.fetch_add(runtime * phase); + m_scheduled_records.fetch_add(reccnt * phase); - long time_per_record = m_scheduled_reconstruction_time.load() / m_scheduled_records.load(); - size_t records_to_1us = 1e6 / time_per_record; + long time_per_record = (m_scheduled_records.load()) + ? m_scheduled_reconstruction_time.load() / + m_scheduled_records.load() + : 0; - double rate = 1.0 / (double) records_to_1us; + long us = time_per_record / 1000; + long excess_ns = time_per_record - us*1000; - if (rate < 1) { - m_insertion_rate.store(1 - rate); - } else { - // FIXME: the code is *not* set up to handle this situation - m_insertion_rate.store(1); - fprintf(stderr, "Warning: greater than 1us per record stall time needed.\n"); + m_stall_us.store(us); + + size_t records_to_1us = (excess_ns) ? 1000 / excess_ns : 0; + m_insertion_rate.store(1.0 - 1.0 / (double) records_to_1us ); + + fprintf(stderr, "[%d] Updated delay: %ld and %ld\n", phase, us, excess_ns); + + if (runtime == 0) { + fprintf(stderr, "\t[W] Predicted runtime change of 0\n"); + } + + if (reccnt == 0) { + fprintf(stderr, "\t[W] Predicted reccnt change of 0\n"); } } @@ -679,6 +693,10 @@ private: // recon_id); extension->m_lock_mngr.release_lock(level, args->version->get_id()); } + + extension->update_insertion_delay(args->predicted_runtime, + args->tasks.get_total_reccnt(), + RECON_ENDED); } if (args->priority == ReconstructionPriority::FLUSH) { @@ -690,9 +708,6 @@ private: // args->version->get_id(), recon_id); // - extension->update_stall_probability(args->predicted_runtime, - args->tasks.get_total_reccnt(), - RECON_ENDED); /* manually delete the argument object */ delete args; @@ -921,11 +936,22 @@ private: args->priority = ReconstructionPriority::MAINT; args->initial_version = active_version->get_id(); + size_t level_idx = 0; for (size_t i=0; i< recon.size(); i++) { - args->predicted_runtime += active_version->get_structure()->get_level_vector()[recon[i].target]->predict_reconstruction_time(recon[i].reccnt); + if (recon[i].target < (level_index) active_version->get_structure()->get_level_vector().size()) { + level_idx = recon[i].target; + args->predicted_runtime += active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt); + } else { + level_idx = recon[i].target - 1; + args->predicted_runtime += m_config.rt_level_scale * active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt); + } } - update_stall_probability(args->predicted_runtime, + if (args->predicted_runtime == 0) { + fprintf(stderr, "Runtime Prediction of 0 for Level %ld with %ld records\n", level_idx, args->tasks.get_total_reccnt()); + } + + update_insertion_delay(args->predicted_runtime, args->tasks.get_total_reccnt(), RECON_SCHEDULED); m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), @@ -956,17 +982,19 @@ private: } /* - * Insertion rate limiting is handled by Bernoulli Sampling to determine - * whether a given insert succeeds or not. If an insert is allowed to - * happen, a buffer append will be attempted (this might still fail, - * though). Otherwise, the insert will not be attempted and should - * be retried. If the insertion rate is unity, then no probabilistic - * filtering will occur. + * + */ + if (m_stall_us.load() > 0) { + usleep(m_stall_us.load()); + } + + /* + * */ if (m_insertion_rate.load() < 1 && rng) { auto p = gsl_rng_uniform(rng); if (p > m_insertion_rate.load()) { - return false; + usleep(1); } } diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index 1dee359..205dbf9 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -18,6 +18,8 @@ #include <future> #include <memory> #include <vector> +#include <deque> +#include <algorithm> #include "framework/interface/Query.h" #include "framework/interface/Shard.h" @@ -32,7 +34,7 @@ class InternalLevel { typedef std::pair<std::shared_ptr<ShardType>, size_t> shard_ptr; public: - InternalLevel(ssize_t level_no) : m_level_no(level_no) {} + InternalLevel(ssize_t level_no) : m_level_no(level_no) { } ~InternalLevel() = default; @@ -197,6 +199,10 @@ public: new_level->append(m_shards[i].first, m_shards[i].second); } + for (auto itr=m_rt_window.begin(); itr < m_rt_window.end(); itr++) { + new_level->m_rt_window.push_front(*itr); + } + return new_level; } @@ -234,16 +240,33 @@ public: } long predict_reconstruction_time(size_t reccnt) { - return 0; + if (m_rt_window.size() == 0) { + return 0; + } + + size_t total = 0; + for (auto rt : m_rt_window) { + total += rt; + } + + return total / m_rt_window.size(); } void update_reconstruction_model(reconstruction_results<ShardType> &recon) { - + if (m_rt_window.size() >= m_window_size) { + m_rt_window.pop_back(); + } + + m_rt_window.push_front(recon.runtime); } private: ssize_t m_level_no; std::vector<shard_ptr> m_shards; + + const size_t m_window_size = 15; + + std::deque<size_t> m_rt_window; }; } // namespace de diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index dde87fe..3ae3492 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -48,6 +48,8 @@ class DEConfiguration { size_t buffer_flush_query_preemption_trigger = UINT64_MAX; + bool dynamic_ratelimiting = false; + size_t rt_level_scale = 1; }; } // namespace de |