summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/framework/DynamicExtension.h76
-rw-r--r--include/framework/structure/InternalLevel.h29
-rw-r--r--include/framework/util/Configuration.h2
3 files changed, 80 insertions, 27 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index d8c160c..6cfe72b 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -420,6 +420,7 @@ private:
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
alignas(64) std::atomic<double> m_insertion_rate;
+ alignas(64) std::atomic<long> m_stall_us;
alignas(64) std::atomic<long> m_scheduled_reconstruction_time;
@@ -430,24 +431,37 @@ private:
RECON_ENDED = -1
};
-
- void update_stall_probability(long runtime, size_t reccnt, reconstruction_phase phase) {
+ void update_insertion_delay(long runtime, size_t reccnt, reconstruction_phase phase) {
+
+ if (!m_config.dynamic_ratelimiting) {
+ return;
+ }
m_scheduled_reconstruction_time.fetch_add(runtime * phase);
- m_scheduled_records.fetch_add(runtime * phase);
+ m_scheduled_records.fetch_add(reccnt * phase);
- long time_per_record = m_scheduled_reconstruction_time.load() / m_scheduled_records.load();
- size_t records_to_1us = 1e6 / time_per_record;
+ long time_per_record = (m_scheduled_records.load())
+ ? m_scheduled_reconstruction_time.load() /
+ m_scheduled_records.load()
+ : 0;
- double rate = 1.0 / (double) records_to_1us;
+ long us = time_per_record / 1000;
+ long excess_ns = time_per_record - us*1000;
- if (rate < 1) {
- m_insertion_rate.store(1 - rate);
- } else {
- // FIXME: the code is *not* set up to handle this situation
- m_insertion_rate.store(1);
- fprintf(stderr, "Warning: greater than 1us per record stall time needed.\n");
+ m_stall_us.store(us);
+
+ size_t records_to_1us = (excess_ns) ? 1000 / excess_ns : 0;
+ m_insertion_rate.store(1.0 - 1.0 / (double) records_to_1us );
+
+ fprintf(stderr, "[%d] Updated delay: %ld and %ld\n", phase, us, excess_ns);
+
+ if (runtime == 0) {
+ fprintf(stderr, "\t[W] Predicted runtime change of 0\n");
+ }
+
+ if (reccnt == 0) {
+ fprintf(stderr, "\t[W] Predicted reccnt change of 0\n");
}
}
@@ -679,6 +693,10 @@ private:
// recon_id);
extension->m_lock_mngr.release_lock(level, args->version->get_id());
}
+
+ extension->update_insertion_delay(args->predicted_runtime,
+ args->tasks.get_total_reccnt(),
+ RECON_ENDED);
}
if (args->priority == ReconstructionPriority::FLUSH) {
@@ -690,9 +708,6 @@ private:
// args->version->get_id(), recon_id);
//
- extension->update_stall_probability(args->predicted_runtime,
- args->tasks.get_total_reccnt(),
- RECON_ENDED);
/* manually delete the argument object */
delete args;
@@ -921,11 +936,22 @@ private:
args->priority = ReconstructionPriority::MAINT;
args->initial_version = active_version->get_id();
+ size_t level_idx = 0;
for (size_t i=0; i< recon.size(); i++) {
- args->predicted_runtime += active_version->get_structure()->get_level_vector()[recon[i].target]->predict_reconstruction_time(recon[i].reccnt);
+ if (recon[i].target < (level_index) active_version->get_structure()->get_level_vector().size()) {
+ level_idx = recon[i].target;
+ args->predicted_runtime += active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt);
+ } else {
+ level_idx = recon[i].target - 1;
+ args->predicted_runtime += m_config.rt_level_scale * active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt);
+ }
}
- update_stall_probability(args->predicted_runtime,
+ if (args->predicted_runtime == 0) {
+ fprintf(stderr, "Runtime Prediction of 0 for Level %ld with %ld records\n", level_idx, args->tasks.get_total_reccnt());
+ }
+
+ update_insertion_delay(args->predicted_runtime,
args->tasks.get_total_reccnt(),
RECON_SCHEDULED);
m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(),
@@ -956,17 +982,19 @@ private:
}
/*
- * Insertion rate limiting is handled by Bernoulli Sampling to determine
- * whether a given insert succeeds or not. If an insert is allowed to
- * happen, a buffer append will be attempted (this might still fail,
- * though). Otherwise, the insert will not be attempted and should
- * be retried. If the insertion rate is unity, then no probabilistic
- * filtering will occur.
+ *
+ */
+ if (m_stall_us.load() > 0) {
+ usleep(m_stall_us.load());
+ }
+
+ /*
+ *
*/
if (m_insertion_rate.load() < 1 && rng) {
auto p = gsl_rng_uniform(rng);
if (p > m_insertion_rate.load()) {
- return false;
+ usleep(1);
}
}
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 1dee359..205dbf9 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -18,6 +18,8 @@
#include <future>
#include <memory>
#include <vector>
+#include <deque>
+#include <algorithm>
#include "framework/interface/Query.h"
#include "framework/interface/Shard.h"
@@ -32,7 +34,7 @@ class InternalLevel {
typedef std::pair<std::shared_ptr<ShardType>, size_t> shard_ptr;
public:
- InternalLevel(ssize_t level_no) : m_level_no(level_no) {}
+ InternalLevel(ssize_t level_no) : m_level_no(level_no) { }
~InternalLevel() = default;
@@ -197,6 +199,10 @@ public:
new_level->append(m_shards[i].first, m_shards[i].second);
}
+ for (auto itr=m_rt_window.begin(); itr < m_rt_window.end(); itr++) {
+ new_level->m_rt_window.push_front(*itr);
+ }
+
return new_level;
}
@@ -234,16 +240,33 @@ public:
}
long predict_reconstruction_time(size_t reccnt) {
- return 0;
+ if (m_rt_window.size() == 0) {
+ return 0;
+ }
+
+ size_t total = 0;
+ for (auto rt : m_rt_window) {
+ total += rt;
+ }
+
+ return total / m_rt_window.size();
}
void update_reconstruction_model(reconstruction_results<ShardType> &recon) {
-
+ if (m_rt_window.size() >= m_window_size) {
+ m_rt_window.pop_back();
+ }
+
+ m_rt_window.push_front(recon.runtime);
}
private:
ssize_t m_level_no;
std::vector<shard_ptr> m_shards;
+
+ const size_t m_window_size = 15;
+
+ std::deque<size_t> m_rt_window;
};
} // namespace de
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index dde87fe..3ae3492 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -48,6 +48,8 @@ class DEConfiguration {
size_t buffer_flush_query_preemption_trigger = UINT64_MAX;
+ bool dynamic_ratelimiting = false;
+ size_t rt_level_scale = 1;
};
} // namespace de