diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-04-25 18:36:54 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-04-25 18:36:54 -0400 |
| commit | 81d3ef3cb4a00d566978ebca511fd947f3ef9b1b (patch) | |
| tree | f3ca946790e2fa6a94359c0053845ae4d94c2578 /include/framework | |
| parent | 1957b2dd33b244754cd47db05f831a7627b8031e (diff) | |
| download | dynamic-extension-81d3ef3cb4a00d566978ebca511fd947f3ef9b1b.tar.gz | |
Improved statistics calculation
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 2 | ||||
| -rw-r--r-- | include/framework/scheduling/FIFOScheduler.h | 1 | ||||
| -rw-r--r-- | include/framework/scheduling/statistics.h | 196 |
3 files changed, 116 insertions, 83 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 0bb1524..59b784b 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -383,6 +383,8 @@ public: */ void print_scheduler_statistics() const { m_sched->print_statistics(); } + void print_scheduler_query_data() const { m_sched->print_query_time_data(); } + /** * Writes a schematic view of the currently active structure to * stdout. Each level is on its own line, and each shard is represented. diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 703f13e..2f49b5f 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -72,6 +72,7 @@ public: } void print_statistics() { m_stats.print_statistics(); } + void print_query_time_data() { m_stats.print_query_time_data(); } private: psudb::LockedPriorityQueue<Task> m_task_queue; diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h index 706ba84..304a0c4 100644 --- a/include/framework/scheduling/statistics.h +++ b/include/framework/scheduling/statistics.h @@ -20,6 +20,7 @@ #include <mutex> #include <unordered_map> #include <vector> +#include <cmath> namespace de { @@ -41,7 +42,32 @@ private: size_t size; size_t type; - JobInfo(size_t id, size_t size, size_t type) : id(id), size(size), type(type) {} + std::chrono::system_clock::time_point queue_time; + std::chrono::system_clock::time_point scheduled_time; + std::chrono::system_clock::time_point start_time; + std::chrono::system_clock::time_point stop_time; + + JobInfo(size_t id, size_t size, size_t type) + : id(id), size(size), type(type) {} + + int64_t time_in_queue() { + return std::chrono::duration_cast<std::chrono::nanoseconds>( + scheduled_time - queue_time) + .count(); + } + + int64_t runtime() { + return std::chrono::duration_cast<std::chrono::nanoseconds>(stop_time - + start_time) + .count(); + } + + int64_t end_to_end_time() { + return std::chrono::duration_cast<std::chrono::nanoseconds>(stop_time - + queue_time) + .count(); + } + }; public: @@ -52,45 +78,31 @@ public: std::unique_lock<std::mutex> lk(m_mutex); m_jobs.insert({id, {id, size, type}}); m_event_log.emplace_back(id, EventType::QUEUED); + m_job_times_set = false; } void job_scheduled(size_t id) { std::unique_lock<std::mutex> lk(m_mutex); m_event_log.emplace_back(id, EventType::SCHEDULED); + m_job_times_set = false; } void job_begin(size_t id) { std::unique_lock<std::mutex> lk(m_mutex); m_event_log.emplace_back(id, EventType::STARTED); + m_job_times_set = false; } void job_complete(size_t id) { std::unique_lock<std::mutex> lk(m_mutex); m_event_log.emplace_back(id, EventType::FINISHED); - } - - /* FIXME: This is just a temporary approach */ - void log_time_data(size_t length, size_t type) { - assert(type == 1 || type == 2 || type == 3); - - if (type == 1) { - m_type_1_cnt.fetch_add(1); - m_type_1_total_time.fetch_add(length); - - if (length > m_type_1_largest_time) { - m_type_1_largest_time.store(length); - } - } else if (type == 2) { - m_type_2_cnt.fetch_add(1); - m_type_2_total_time.fetch_add(length); - - if (length > m_type_2_largest_time) { - m_type_2_largest_time.store(length); - } - } + m_job_times_set = false; } void print_statistics() { + std::unique_lock<std::mutex> lk(m_mutex); + update_job_data_from_log(); + int64_t total_queue_time = 0; int64_t max_queue_time = 0; int64_t min_queue_time = INT64_MAX; @@ -101,94 +113,112 @@ public: int64_t query_cnt = 0; - /* dumb brute force approach; there are a million better ways to do this */ - size_t i = 0; - for (auto &job : m_jobs) { - std::chrono::system_clock::time_point queue_time; - std::chrono::system_clock::time_point schedule_time; - std::chrono::system_clock::time_point start_time; - std::chrono::system_clock::time_point stop_time; - /* just look at queries for now */ - if (job.second.type == 1) { - for (auto &event : m_event_log) { - if (event.id == job.first) { - switch (event.type) { - case EventType::QUEUED: - queue_time = event.time; - i++; - break; - case EventType::FINISHED: - stop_time = event.time; - i++; - break; - case EventType::SCHEDULED: - schedule_time = event.time; - i++; - break; - case EventType::STARTED: - start_time = event.time; - i++; - break; - } - } - } - } - /* event wasn't fully logged, so we'll skip it */ - if (i != 4) { - i=0; + /* hard-coded for the moment to only consider queries */ + for (auto &job : m_jobs) { + if (job.second.type != 1) { continue; } - i=0; - - auto time_in_queue = std::chrono::duration_cast<std::chrono::nanoseconds>(schedule_time - queue_time).count(); - auto runtime = std::chrono::duration_cast<std::chrono::nanoseconds>(stop_time - start_time).count(); - total_queue_time += time_in_queue; - total_runtime += runtime; + total_queue_time += job.second.time_in_queue(); + total_runtime += job.second.runtime(); - if (time_in_queue > max_queue_time) { - max_queue_time = time_in_queue; + if (job.second.time_in_queue() > max_queue_time) { + max_queue_time = job.second.time_in_queue(); } - if (time_in_queue < min_queue_time) { - min_queue_time = time_in_queue; + if (job.second.time_in_queue() < min_queue_time) { + min_queue_time = job.second.time_in_queue(); } - if (runtime > max_runtime) { - max_runtime = runtime; + if (job.second.runtime() > max_runtime) { + max_runtime = job.second.runtime(); } - if (runtime < min_runtime) { - min_runtime = runtime; + if (job.second.runtime() < min_runtime) { + min_runtime = job.second.runtime(); } query_cnt++; } - - if (query_cnt == 0) { - return; - } + int64_t average_queue_time = total_queue_time / query_cnt; int64_t average_runtime = total_runtime / query_cnt; - fprintf(stdout, "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t Max Scheduling Delay: %ld\n", average_queue_time, min_queue_time, max_queue_time); - fprintf(stdout, "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query Latency: %ld\n", average_runtime, min_runtime, max_runtime); + /* calculate standard deviations */ + int64_t queue_deviation_sum = 0; + int64_t runtime_deviation_sum = 0; + for (auto &job : m_jobs) { + if (job.second.type != 1) { + continue; + } + + queue_deviation_sum += std::pow(job.second.time_in_queue() - average_queue_time, 2); + runtime_deviation_sum += std::pow(job.second.runtime() - average_runtime, 2); + + } + + int64_t queue_stddev = std::sqrt(queue_deviation_sum / query_cnt); + int64_t runtime_stddev = std::sqrt(runtime_deviation_sum / query_cnt); + + + fprintf(stdout, "Query Count: %ld\n", query_cnt); + fprintf(stdout, "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t Max Scheduling Delay: %ld\tStandard Deviation: %ld\n", average_queue_time, min_queue_time, max_queue_time, queue_stddev); + fprintf(stdout, "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query Latency: %ld\tStandard Deviation: %ld\n", average_runtime, min_runtime, max_runtime, runtime_stddev); + } + + + void print_query_time_data() { + std::unique_lock<std::mutex> lk(m_mutex); + update_job_data_from_log(); + + for (auto &job : m_jobs) { + if (job.second.type == 1) { + fprintf(stdout, "%ld\t%ld\t%ld\n", job.second.time_in_queue(), job.second.runtime(), job.second.end_to_end_time()); + } + } } private: std::mutex m_mutex; std::unordered_map<size_t, JobInfo> m_jobs; std::vector<Event> m_event_log; + bool m_job_times_set; + + void update_job_data_from_log() { + if (m_job_times_set) { + return; + } - std::atomic<size_t> m_type_1_cnt; - std::atomic<size_t> m_type_1_total_time; + /* + * these updates could be made when the time point is recorded, but I + * want to keep as much of this off the critical path as possible. Any + * work done here won't affect performance. + */ + for (auto &event : m_event_log) { + if (!m_jobs.contains(event.id)) { + continue; + } - std::atomic<size_t> m_type_2_cnt; - std::atomic<size_t> m_type_2_total_time; + auto &job = m_jobs.at(event.id); + switch (event.type) { + case EventType::QUEUED: + job.queue_time = event.time; + break; + case EventType::FINISHED: + job.stop_time = event.time; + break; + case EventType::SCHEDULED: + job.scheduled_time = event.time; + break; + case EventType::STARTED: + job.start_time = event.time; + break; + } + } - std::atomic<size_t> m_type_1_largest_time; - std::atomic<size_t> m_type_2_largest_time; + m_job_times_set = true; + } }; } // namespace de |