summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/framework/DynamicExtension.h42
-rw-r--r--include/framework/scheduling/Task.h1
2 files changed, 43 insertions, 0 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 5917475..d8c160c 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -421,6 +421,36 @@ private:
alignas(64) std::atomic<double> m_insertion_rate;
+ alignas(64) std::atomic<long> m_scheduled_reconstruction_time;
+
+ alignas(64) std::atomic<long> m_scheduled_records;
+
+ enum reconstruction_phase {
+ RECON_SCHEDULED = 1,
+ RECON_ENDED = -1
+ };
+
+
+
+ void update_stall_probability(long runtime, size_t reccnt, reconstruction_phase phase) {
+
+ m_scheduled_reconstruction_time.fetch_add(runtime * phase);
+ m_scheduled_records.fetch_add(runtime * phase);
+
+ long time_per_record = m_scheduled_reconstruction_time.load() / m_scheduled_records.load();
+ size_t records_to_1us = 1e6 / time_per_record;
+
+ double rate = 1.0 / (double) records_to_1us;
+
+ if (rate < 1) {
+ m_insertion_rate.store(1 - rate);
+ } else {
+ // FIXME: the code is *not* set up to handle this situation
+ m_insertion_rate.store(1);
+ fprintf(stderr, "Warning: greater than 1us per record stall time needed.\n");
+ }
+ }
+
bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args,
size_t version) {
if (version <= m_preempt_version.load()) {
@@ -658,7 +688,11 @@ private:
// fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n",
// args->version->get_id(), recon_id);
+ //
+ extension->update_stall_probability(args->predicted_runtime,
+ args->tasks.get_total_reccnt(),
+ RECON_ENDED);
/* manually delete the argument object */
delete args;
@@ -886,6 +920,14 @@ private:
args->extension = this;
args->priority = ReconstructionPriority::MAINT;
args->initial_version = active_version->get_id();
+
+ for (size_t i=0; i< recon.size(); i++) {
+ args->predicted_runtime += active_version->get_structure()->get_level_vector()[recon[i].target]->predict_reconstruction_time(recon[i].reccnt);
+ }
+
+ update_stall_probability(args->predicted_runtime,
+ args->tasks.get_total_reccnt(),
+ RECON_SCHEDULED);
m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(),
args, RECONSTRUCTION);
}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index b2884c7..4529b2e 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -38,6 +38,7 @@ struct ReconstructionArgs {
void *extension;
ReconstructionPriority priority;
size_t initial_version;
+ long predicted_runtime;
};
template <ShardInterface S, QueryInterface<S> Q, typename DE> struct QueryArgs {