diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-04-25 12:28:31 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-04-25 12:28:31 -0400 |
| commit | 1957b2dd33b244754cd47db05f831a7627b8031e (patch) | |
| tree | da438aabd8ff188e2a3c94c69352a36ec243f730 | |
| parent | 5a3d36fecabc8f220b19dcaea28a78f99b5244af (diff) | |
| download | dynamic-extension-1957b2dd33b244754cd47db05f831a7627b8031e.tar.gz | |
Scheduler statistics tracking update
The current scheme is really inefficient in terms
of retreival of the results, but keeps the critical
path mostly clear. It's probably worth it to do
a more organized tracking of the data as it comes
in, to avoid an n^2 statistics generation step
at the end.
| -rw-r--r-- | benchmarks/tail-latency/query_parm_sweep.cpp | 21 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 12 | ||||
| -rw-r--r-- | include/framework/scheduling/FIFOScheduler.h | 2 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 9 | ||||
| -rw-r--r-- | include/framework/scheduling/statistics.h | 122 | ||||
| -rw-r--r-- | include/framework/util/Configuration.h | 2 |
6 files changed, 130 insertions, 38 deletions
diff --git a/benchmarks/tail-latency/query_parm_sweep.cpp b/benchmarks/tail-latency/query_parm_sweep.cpp index 4f11ce0..a91db8c 100644 --- a/benchmarks/tail-latency/query_parm_sweep.cpp +++ b/benchmarks/tail-latency/query_parm_sweep.cpp @@ -29,11 +29,11 @@ typedef de::Record<uint64_t, uint64_t> Rec; typedef de::ISAMTree<Rec> Shard; typedef de::pl::Query<Shard> Q; typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, - de::SerialScheduler> + de::FIFOScheduler> Ext; typedef Q::Parameters QP; typedef de::DEConfiguration<Shard, Q, de::DeletePolicy::TOMBSTONE, - de::SerialScheduler> + de::FIFOScheduler> Conf; std::atomic<size_t> idx; @@ -103,11 +103,11 @@ int main(int argc, char **argv) { auto queries =read_sosd_point_lookups<QP>(q_fname, 100); size_t buffer_size = 8000; - std::vector<size_t> policies = {0, 1, 2}; + std::vector<size_t> policies = {0}; std::vector<size_t> thread_counts = {8}; - std::vector<double> modifiers = {0, .3, .5, .8}; - std::vector<size_t> scale_factors = {2, 4, 8}; + std::vector<double> modifiers = {0}; + std::vector<size_t> scale_factors = {8, 8, 8, 8, 8}; size_t insert_threads = 1; size_t query_threads = 1; @@ -165,11 +165,19 @@ int main(int argc, char **argv) { extension->await_version(); + /* run some queries to "warm up" the cache */ + for (size_t i=0; i<queries.size()*2; i++) { + auto q_idx = i % queries.size(); + auto q = queries[q_idx]; + auto res = extension->query(std::move(q)).get(); + total_res.fetch_add(res.size()); + } + total_query_count.store(50000); TIMER_INIT(); TIMER_START(); for (size_t i=0; i<total_query_count; i++) { - auto q_idx = rand() % queries.size(); + auto q_idx = i % queries.size(); auto q = queries[q_idx]; auto res = extension->query(std::move(q)).get(); total_res.fetch_add(res.size()); @@ -187,6 +195,7 @@ int main(int argc, char **argv) { fprintf(stdout, "%ld\t%ld\t%ld\t%lf\t%ld\t%ld\t%ld\t%ld\n", internal_thread_cnt, pol, sf, mod, extension->get_height(), extension->get_shard_count(), insert_tput, query_lat); + extension->print_scheduler_statistics(); fflush(stdout); total_res.store(0); diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index fb82638..0bb1524 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -471,7 +471,7 @@ private: static void reconstruction(void *arguments) { auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments; auto extension = (DynamicExtension *)args->extension; - extension->SetThreadAffinity(); + extension->set_thread_affinity(); static std::atomic<size_t> cnt = 0; size_t recon_id = cnt.fetch_add(1); @@ -661,6 +661,7 @@ private: static void async_query(void *arguments) { auto *args = (QueryArgs<ShardType, QueryType, DynamicExtension> *)arguments; + args->extension->set_thread_affinity(); auto version = args->extension->get_active_version(); @@ -927,13 +928,12 @@ private: } //#ifdef _GNU_SOURCE -#if 0 - void SetThreadAffinity() { + void set_thread_affinity() { if constexpr (std::same_as<SchedType, SerialScheduler>) { return; } - int core = m_next_core.fetch_add(1) % m_core_cnt; + int core = m_next_core.fetch_add(1) % m_config.physical_core_count; cpu_set_t mask; CPU_ZERO(&mask); @@ -954,8 +954,10 @@ private: CPU_SET(core, &mask); ::sched_setaffinity(0, sizeof(mask), &mask); } + /* #else - void SetThreadAffinity() {} + void set_thread_affinity() {} #endif +*/ }; } // namespace de diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 6875aca..703f13e 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -14,6 +14,7 @@ #include "framework/scheduling/Task.h" #include "framework/scheduling/statistics.h" +#include <bits/chrono.h> #include <chrono> #include <condition_variable> #include <thread> @@ -61,7 +62,6 @@ public: m_stats.job_queued(ts, type, size); m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv)); - m_cv.notify_all(); } diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 3dbc9f4..b2884c7 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -72,7 +72,6 @@ struct Task { } void operator()(size_t thrd_id) { - auto start = std::chrono::high_resolution_clock::now(); if (m_stats) { m_stats->job_begin(m_timestamp); } @@ -82,14 +81,6 @@ struct Task { if (m_stats) { m_stats->job_complete(m_timestamp); } - auto stop = std::chrono::high_resolution_clock::now(); - - if (m_stats) { - auto time = - std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start) - .count(); - m_stats->log_time_data(time, m_type); - } if (m_lk) { m_lk->unlock(); diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h index a6d66ab..706ba84 100644 --- a/include/framework/scheduling/statistics.h +++ b/include/framework/scheduling/statistics.h @@ -30,25 +30,44 @@ private: struct Event { size_t id; EventType type; + std::chrono::system_clock::time_point time; + + Event(size_t id, EventType type) + : id(id), type(type), time(std::chrono::high_resolution_clock::now()) {} }; struct JobInfo { size_t id; size_t size; size_t type; + + JobInfo(size_t id, size_t size, size_t type) : id(id), size(size), type(type) {} }; public: SchedulerStatistics() = default; ~SchedulerStatistics() = default; - void job_queued(size_t id, size_t type, size_t size) { } + void job_queued(size_t id, size_t type, size_t size) { + std::unique_lock<std::mutex> lk(m_mutex); + m_jobs.insert({id, {id, size, type}}); + m_event_log.emplace_back(id, EventType::QUEUED); + } - void job_scheduled(size_t id) { std::unique_lock<std::mutex> lk(m_mutex); } + void job_scheduled(size_t id) { + std::unique_lock<std::mutex> lk(m_mutex); + m_event_log.emplace_back(id, EventType::SCHEDULED); + } - void job_begin(size_t id) {} + void job_begin(size_t id) { + std::unique_lock<std::mutex> lk(m_mutex); + m_event_log.emplace_back(id, EventType::STARTED); + } - void job_complete(size_t id) {} + void job_complete(size_t id) { + std::unique_lock<std::mutex> lk(m_mutex); + m_event_log.emplace_back(id, EventType::FINISHED); + } /* FIXME: This is just a temporary approach */ void log_time_data(size_t length, size_t type) { @@ -72,21 +91,90 @@ public: } void print_statistics() { - if (m_type_1_cnt > 0) { - fprintf( - stdout, - "Query Count: %ld\tQuery Avg. Latency: %ld\tMax Query Latency: %ld\n", - m_type_1_cnt.load(), m_type_1_total_time.load() / m_type_1_cnt.load(), - m_type_1_largest_time.load()); + int64_t total_queue_time = 0; + int64_t max_queue_time = 0; + int64_t min_queue_time = INT64_MAX; + + int64_t total_runtime = 0; + int64_t max_runtime = 0; + int64_t min_runtime = INT64_MAX; + + int64_t query_cnt = 0; + + /* dumb brute force approach; there are a million better ways to do this */ + size_t i = 0; + for (auto &job : m_jobs) { + std::chrono::system_clock::time_point queue_time; + std::chrono::system_clock::time_point schedule_time; + std::chrono::system_clock::time_point start_time; + std::chrono::system_clock::time_point stop_time; + + /* just look at queries for now */ + if (job.second.type == 1) { + for (auto &event : m_event_log) { + if (event.id == job.first) { + switch (event.type) { + case EventType::QUEUED: + queue_time = event.time; + i++; + break; + case EventType::FINISHED: + stop_time = event.time; + i++; + break; + case EventType::SCHEDULED: + schedule_time = event.time; + i++; + break; + case EventType::STARTED: + start_time = event.time; + i++; + break; + } + } + } + } + /* event wasn't fully logged, so we'll skip it */ + if (i != 4) { + i=0; + continue; + } + i=0; + + auto time_in_queue = std::chrono::duration_cast<std::chrono::nanoseconds>(schedule_time - queue_time).count(); + auto runtime = std::chrono::duration_cast<std::chrono::nanoseconds>(stop_time - start_time).count(); + + total_queue_time += time_in_queue; + total_runtime += runtime; + + if (time_in_queue > max_queue_time) { + max_queue_time = time_in_queue; + } + + if (time_in_queue < min_queue_time) { + min_queue_time = time_in_queue; + } + + if (runtime > max_runtime) { + max_runtime = runtime; + } + + if (runtime < min_runtime) { + min_runtime = runtime; + } + + query_cnt++; } - if (m_type_2_cnt > 0) { - fprintf(stdout, - "Reconstruction Count: %ld\tReconstruction Avg. Latency: " - "%ld\tMax Recon. Latency:%ld\n", - m_type_2_cnt.load(), - m_type_2_total_time.load() / m_type_2_cnt.load(), - m_type_2_largest_time.load()); + + if (query_cnt == 0) { + return; } + + int64_t average_queue_time = total_queue_time / query_cnt; + int64_t average_runtime = total_runtime / query_cnt; + + fprintf(stdout, "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t Max Scheduling Delay: %ld\n", average_queue_time, min_queue_time, max_queue_time); + fprintf(stdout, "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query Latency: %ld\n", average_runtime, min_runtime, max_runtime); } private: diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 2b8a7fc..dde87fe 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -44,6 +44,8 @@ class DEConfiguration { size_t minimum_query_threads = 4; size_t maximum_memory_usage = 0; /* o for unbounded */ + size_t physical_core_count = 6; + size_t buffer_flush_query_preemption_trigger = UINT64_MAX; }; |