diff options
| -rw-r--r-- | include/framework/DynamicExtension.h | 42 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 1 |
2 files changed, 43 insertions, 0 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 5917475..d8c160c 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -421,6 +421,36 @@ private: alignas(64) std::atomic<double> m_insertion_rate; + alignas(64) std::atomic<long> m_scheduled_reconstruction_time; + + alignas(64) std::atomic<long> m_scheduled_records; + + enum reconstruction_phase { + RECON_SCHEDULED = 1, + RECON_ENDED = -1 + }; + + + + void update_stall_probability(long runtime, size_t reccnt, reconstruction_phase phase) { + + m_scheduled_reconstruction_time.fetch_add(runtime * phase); + m_scheduled_records.fetch_add(runtime * phase); + + long time_per_record = m_scheduled_reconstruction_time.load() / m_scheduled_records.load(); + size_t records_to_1us = 1e6 / time_per_record; + + double rate = 1.0 / (double) records_to_1us; + + if (rate < 1) { + m_insertion_rate.store(1 - rate); + } else { + // FIXME: the code is *not* set up to handle this situation + m_insertion_rate.store(1); + fprintf(stderr, "Warning: greater than 1us per record stall time needed.\n"); + } + } + bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args, size_t version) { if (version <= m_preempt_version.load()) { @@ -658,7 +688,11 @@ private: // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n", // args->version->get_id(), recon_id); + // + extension->update_stall_probability(args->predicted_runtime, + args->tasks.get_total_reccnt(), + RECON_ENDED); /* manually delete the argument object */ delete args; @@ -886,6 +920,14 @@ private: args->extension = this; args->priority = ReconstructionPriority::MAINT; args->initial_version = active_version->get_id(); + + for (size_t i=0; i< recon.size(); i++) { + args->predicted_runtime += active_version->get_structure()->get_level_vector()[recon[i].target]->predict_reconstruction_time(recon[i].reccnt); + } + + update_stall_probability(args->predicted_runtime, + args->tasks.get_total_reccnt(), + RECON_SCHEDULED); m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), args, RECONSTRUCTION); } diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index b2884c7..4529b2e 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -38,6 +38,7 @@ struct ReconstructionArgs { void *extension; ReconstructionPriority priority; size_t initial_version; + long predicted_runtime; }; template <ShardInterface S, QueryInterface<S> Q, typename DE> struct QueryArgs { |