diff options
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 73 |
1 files changed, 35 insertions, 38 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 6cfe72b..63264a0 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -70,7 +70,8 @@ public: * for various configuration parameters in the system. See * include/framework/util/Configuration.h for details. */ - DynamicExtension(ConfType &&config, double insertion_rate=1.0) : m_config(std::move(config)) { + DynamicExtension(ConfType &&config, double insertion_rate = 1.0) + : m_config(std::move(config)) { m_buffer = std::make_unique<BufferType>(m_config.buffer_flush_trigger, m_config.buffer_size); @@ -426,13 +427,10 @@ private: alignas(64) std::atomic<long> m_scheduled_records; - enum reconstruction_phase { - RECON_SCHEDULED = 1, - RECON_ENDED = -1 - }; + enum reconstruction_phase { RECON_SCHEDULED = 1, RECON_ENDED = -1 }; - - void update_insertion_delay(long runtime, size_t reccnt, reconstruction_phase phase) { + void update_insertion_delay(long runtime, size_t reccnt, + reconstruction_phase phase) { if (!m_config.dynamic_ratelimiting) { return; @@ -441,28 +439,19 @@ private: m_scheduled_reconstruction_time.fetch_add(runtime * phase); m_scheduled_records.fetch_add(reccnt * phase); - long time_per_record = (m_scheduled_records.load()) - ? m_scheduled_reconstruction_time.load() / - m_scheduled_records.load() - : 0; + size_t total_reccnt = get_record_count(); + + long time_per_record = + (total_reccnt) ? m_scheduled_reconstruction_time.load() / total_reccnt + : 0; long us = time_per_record / 1000; - long excess_ns = time_per_record - us*1000; + long excess_ns = time_per_record - us * 1000; m_stall_us.store(us); size_t records_to_1us = (excess_ns) ? 1000 / excess_ns : 0; - m_insertion_rate.store(1.0 - 1.0 / (double) records_to_1us ); - - fprintf(stderr, "[%d] Updated delay: %ld and %ld\n", phase, us, excess_ns); - - if (runtime == 0) { - fprintf(stderr, "\t[W] Predicted runtime change of 0\n"); - } - - if (reccnt == 0) { - fprintf(stderr, "\t[W] Predicted reccnt change of 0\n"); - } + m_insertion_rate.store(1.0 - 1.0 / (double)records_to_1us); } bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args, @@ -537,9 +526,9 @@ private: * this code will be bypassed in that case. */ if (args->priority == ReconstructionPriority::FLUSH) { - #ifdef DE_PRINT_SHARD_COUNT - fprintf(stdout, "S\t%ld\n", extension->get_shard_count()); - #endif +#ifdef DE_PRINT_SHARD_COUNT + fprintf(stdout, "S\t%ld\n", extension->get_shard_count()); +#endif // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id); // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n", @@ -694,9 +683,8 @@ private: extension->m_lock_mngr.release_lock(level, args->version->get_id()); } - extension->update_insertion_delay(args->predicted_runtime, - args->tasks.get_total_reccnt(), - RECON_ENDED); + extension->update_insertion_delay( + args->predicted_runtime, args->tasks.get_total_reccnt(), RECON_ENDED); } if (args->priority == ReconstructionPriority::FLUSH) { @@ -708,7 +696,6 @@ private: // args->version->get_id(), recon_id); // - /* manually delete the argument object */ delete args; } @@ -937,23 +924,33 @@ private: args->initial_version = active_version->get_id(); size_t level_idx = 0; - for (size_t i=0; i< recon.size(); i++) { - if (recon[i].target < (level_index) active_version->get_structure()->get_level_vector().size()) { + for (size_t i = 0; i < recon.size(); i++) { + if (recon[i].target < (level_index)active_version->get_structure() + ->get_level_vector() + .size()) { level_idx = recon[i].target; - args->predicted_runtime += active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt); + args->predicted_runtime += + active_version->get_structure() + ->get_level_vector()[level_idx] + ->predict_reconstruction_time(recon[i].reccnt); } else { level_idx = recon[i].target - 1; - args->predicted_runtime += m_config.rt_level_scale * active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt); + args->predicted_runtime += + m_config.rt_level_scale * + active_version->get_structure() + ->get_level_vector()[level_idx] + ->predict_reconstruction_time(recon[i].reccnt); } } if (args->predicted_runtime == 0) { - fprintf(stderr, "Runtime Prediction of 0 for Level %ld with %ld records\n", level_idx, args->tasks.get_total_reccnt()); + fprintf(stderr, + "Runtime Prediction of 0 for Level %ld with %ld records\n", + level_idx, args->tasks.get_total_reccnt()); } update_insertion_delay(args->predicted_runtime, - args->tasks.get_total_reccnt(), - RECON_SCHEDULED); + args->tasks.get_total_reccnt(), RECON_SCHEDULED); m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), args, RECONSTRUCTION); } @@ -1002,7 +999,7 @@ private: return m_buffer->append(rec, ts); } -//#ifdef _GNU_SOURCE + //#ifdef _GNU_SOURCE void set_thread_affinity() { if constexpr (std::same_as<SchedType, SerialScheduler>) { return; |