summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h73
1 files changed, 35 insertions, 38 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 6cfe72b..63264a0 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -70,7 +70,8 @@ public:
* for various configuration parameters in the system. See
* include/framework/util/Configuration.h for details.
*/
- DynamicExtension(ConfType &&config, double insertion_rate=1.0) : m_config(std::move(config)) {
+ DynamicExtension(ConfType &&config, double insertion_rate = 1.0)
+ : m_config(std::move(config)) {
m_buffer = std::make_unique<BufferType>(m_config.buffer_flush_trigger,
m_config.buffer_size);
@@ -426,13 +427,10 @@ private:
alignas(64) std::atomic<long> m_scheduled_records;
- enum reconstruction_phase {
- RECON_SCHEDULED = 1,
- RECON_ENDED = -1
- };
+ enum reconstruction_phase { RECON_SCHEDULED = 1, RECON_ENDED = -1 };
-
- void update_insertion_delay(long runtime, size_t reccnt, reconstruction_phase phase) {
+ void update_insertion_delay(long runtime, size_t reccnt,
+ reconstruction_phase phase) {
if (!m_config.dynamic_ratelimiting) {
return;
@@ -441,28 +439,19 @@ private:
m_scheduled_reconstruction_time.fetch_add(runtime * phase);
m_scheduled_records.fetch_add(reccnt * phase);
- long time_per_record = (m_scheduled_records.load())
- ? m_scheduled_reconstruction_time.load() /
- m_scheduled_records.load()
- : 0;
+ size_t total_reccnt = get_record_count();
+
+ long time_per_record =
+ (total_reccnt) ? m_scheduled_reconstruction_time.load() / total_reccnt
+ : 0;
long us = time_per_record / 1000;
- long excess_ns = time_per_record - us*1000;
+ long excess_ns = time_per_record - us * 1000;
m_stall_us.store(us);
size_t records_to_1us = (excess_ns) ? 1000 / excess_ns : 0;
- m_insertion_rate.store(1.0 - 1.0 / (double) records_to_1us );
-
- fprintf(stderr, "[%d] Updated delay: %ld and %ld\n", phase, us, excess_ns);
-
- if (runtime == 0) {
- fprintf(stderr, "\t[W] Predicted runtime change of 0\n");
- }
-
- if (reccnt == 0) {
- fprintf(stderr, "\t[W] Predicted reccnt change of 0\n");
- }
+ m_insertion_rate.store(1.0 - 1.0 / (double)records_to_1us);
}
bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args,
@@ -537,9 +526,9 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
- #ifdef DE_PRINT_SHARD_COUNT
- fprintf(stdout, "S\t%ld\n", extension->get_shard_count());
- #endif
+#ifdef DE_PRINT_SHARD_COUNT
+ fprintf(stdout, "S\t%ld\n", extension->get_shard_count());
+#endif
// fprintf(stderr, "[I] Running flush (%ld)\n", recon_id);
// fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n",
@@ -694,9 +683,8 @@ private:
extension->m_lock_mngr.release_lock(level, args->version->get_id());
}
- extension->update_insertion_delay(args->predicted_runtime,
- args->tasks.get_total_reccnt(),
- RECON_ENDED);
+ extension->update_insertion_delay(
+ args->predicted_runtime, args->tasks.get_total_reccnt(), RECON_ENDED);
}
if (args->priority == ReconstructionPriority::FLUSH) {
@@ -708,7 +696,6 @@ private:
// args->version->get_id(), recon_id);
//
-
/* manually delete the argument object */
delete args;
}
@@ -937,23 +924,33 @@ private:
args->initial_version = active_version->get_id();
size_t level_idx = 0;
- for (size_t i=0; i< recon.size(); i++) {
- if (recon[i].target < (level_index) active_version->get_structure()->get_level_vector().size()) {
+ for (size_t i = 0; i < recon.size(); i++) {
+ if (recon[i].target < (level_index)active_version->get_structure()
+ ->get_level_vector()
+ .size()) {
level_idx = recon[i].target;
- args->predicted_runtime += active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt);
+ args->predicted_runtime +=
+ active_version->get_structure()
+ ->get_level_vector()[level_idx]
+ ->predict_reconstruction_time(recon[i].reccnt);
} else {
level_idx = recon[i].target - 1;
- args->predicted_runtime += m_config.rt_level_scale * active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt);
+ args->predicted_runtime +=
+ m_config.rt_level_scale *
+ active_version->get_structure()
+ ->get_level_vector()[level_idx]
+ ->predict_reconstruction_time(recon[i].reccnt);
}
}
if (args->predicted_runtime == 0) {
- fprintf(stderr, "Runtime Prediction of 0 for Level %ld with %ld records\n", level_idx, args->tasks.get_total_reccnt());
+ fprintf(stderr,
+ "Runtime Prediction of 0 for Level %ld with %ld records\n",
+ level_idx, args->tasks.get_total_reccnt());
}
update_insertion_delay(args->predicted_runtime,
- args->tasks.get_total_reccnt(),
- RECON_SCHEDULED);
+ args->tasks.get_total_reccnt(), RECON_SCHEDULED);
m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(),
args, RECONSTRUCTION);
}
@@ -1002,7 +999,7 @@ private:
return m_buffer->append(rec, ts);
}
-//#ifdef _GNU_SOURCE
+ //#ifdef _GNU_SOURCE
void set_thread_affinity() {
if constexpr (std::same_as<SchedType, SerialScheduler>) {
return;