diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-19 15:58:04 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-19 15:58:04 -0500 |
| commit | 38693c342558628c75e0ab0d23c32a95a499ed8b (patch) | |
| tree | f193ff1990ea7976a8ceb5d3bf69d677d3e8ee7d | |
| parent | 138c793b0a58577713d98c98bb140cf1d9c79bee (diff) | |
| download | dynamic-extension-38693c342558628c75e0ab0d23c32a95a499ed8b.tar.gz | |
Initial rough-out of internal statistics tracker
Need to figure out the best way to do the detailed tracking in
a concurrent manner. I was thinking just an event log, with parsing
routines for extracting statistics. But that'll be pretty slow.
| -rw-r--r-- | CMakeLists.txt | 2 | ||||
| -rw-r--r-- | benchmarks/reconstruction_interference.cpp | 2 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 15 | ||||
| -rw-r--r-- | include/framework/interface/Scheduler.h | 3 | ||||
| -rw-r--r-- | include/framework/scheduling/FIFOScheduler.h | 28 | ||||
| -rw-r--r-- | include/framework/scheduling/SerialScheduler.h | 27 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 23 | ||||
| -rw-r--r-- | include/framework/scheduling/statistics.h | 96 |
8 files changed, 160 insertions, 36 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index e47ca4e..6e286d7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(namespace "de") project("Practical Dynamic Extension" VERSION 0.1.0) -set(debug true) +set(debug false) set(tests True) set(bench true) set(old_bench False) diff --git a/benchmarks/reconstruction_interference.cpp b/benchmarks/reconstruction_interference.cpp index a843c71..2fb1591 100644 --- a/benchmarks/reconstruction_interference.cpp +++ b/benchmarks/reconstruction_interference.cpp @@ -101,6 +101,8 @@ int main(int argc, char **argv) { auto s = extension->create_static_structure(); fprintf(stderr, "Construction complete\n"); q_thrd.join(); + + extension->print_scheduler_statistics(); delete extension; delete s; diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 7590de2..89ee30f 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -40,6 +40,10 @@ class DynamicExtension { typedef Epoch<R, S, Q, L> _Epoch; typedef BufferView<R> BufView; + static constexpr size_t QUERY = 1; + static constexpr size_t RECONSTRUCTION = 2; + + public: DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0, size_t thread_cnt=16) @@ -226,6 +230,11 @@ public: return t; } + + void print_scheduler_statistics() { + m_sched.print_statistics(); + } + private: SCHED m_sched; @@ -271,7 +280,7 @@ private: */ epoch->start_job(); - m_sched.schedule_job(reconstruction, 0, args); + m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); /* wait for compaction completion */ wait.get(); @@ -511,7 +520,7 @@ private: args->compaction = false; /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ - m_sched.schedule_job(reconstruction, 0, args); + m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); } std::future<std::vector<R>> schedule_query(void *query_parms) { @@ -522,7 +531,7 @@ private: args->query_parms = query_parms; auto result = args->result_set.get_future(); - m_sched.schedule_job(async_query, 0, args); + m_sched.schedule_job(async_query, 0, args, QUERY); return result; } diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h index 94afe6c..451ddd2 100644 --- a/include/framework/interface/Scheduler.h +++ b/include/framework/interface/Scheduler.h @@ -13,6 +13,7 @@ template <typename S> concept SchedulerInterface = requires(S s, size_t i, void *vp, de::Job j) { {S(i, i)}; - {s.schedule_job(j, i, vp)} -> std::convertible_to<void>; + {s.schedule_job(j, i, vp, i)} -> std::convertible_to<void>; {s.shutdown()}; + {s.print_statistics()}; }; diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 4cdc436..513a3a2 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -8,21 +8,11 @@ */ #pragma once -#include <vector> -#include <memory> -#include <queue> #include <thread> #include <condition_variable> -#include <future> - -#include "util/types.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" -#include "framework/interface/Record.h" -#include "framework/structure/MutableBuffer.h" -#include "framework/util/Configuration.h" -#include "framework/structure/ExtensionStructure.h" + #include "framework/scheduling/Task.h" +#include "framework/scheduling/statistics.h" #include "ctpl/ctpl.h" #include "psu-ds/LockedPriorityQueue.h" @@ -54,10 +44,12 @@ public: m_sched_thrd.join(); } - void schedule_job(std::function<void(void*)> job, size_t size, void *args) { + void schedule_job(std::function<void(void*)> job, size_t size, void *args, size_t type=0) { std::unique_lock<std::mutex> lk(m_cv_lock); size_t ts = m_counter.fetch_add(1); - m_task_queue.push(Task(size, ts, job, args)); + + m_stats.job_queued(ts, type, size); + m_task_queue.push(Task(size, ts, job, args, type, &m_stats)); m_cv.notify_all(); } @@ -68,6 +60,10 @@ public: m_cv.notify_all(); } + void print_statistics() { + m_stats.print_statistics(); + } + private: psudb::LockedPriorityQueue<Task> m_task_queue; @@ -86,9 +82,13 @@ private: std::atomic<size_t> m_used_thrds; std::atomic<size_t> m_used_memory; + SchedulerStatistics m_stats; + void schedule_next() { assert(m_task_queue.size() > 0); auto t = m_task_queue.pop(); + m_stats.job_scheduled(t.m_timestamp); + m_thrd_pool.push(t); } diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index 10c2af2..ac59301 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -14,21 +14,8 @@ */ #pragma once -#include <vector> -#include <memory> -#include <queue> -#include <thread> -#include <condition_variable> -#include <future> - -#include "util/types.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" -#include "framework/interface/Record.h" -#include "framework/structure/MutableBuffer.h" -#include "framework/util/Configuration.h" -#include "framework/structure/ExtensionStructure.h" #include "framework/scheduling/Task.h" +#include "framework/scheduling/statistics.h" namespace de { @@ -44,9 +31,11 @@ public: ~SerialScheduler() = default; - void schedule_job(std::function<void(void*)> job, size_t size, void *args) { + void schedule_job(std::function<void(void*)> job, size_t size, void *args, size_t type=0) { size_t ts = m_counter++; - auto t = Task(size, ts, job, args); + m_stats.job_queued(ts, type, size); + m_stats.job_scheduled(ts); + auto t = Task(size, ts, job, args, type, &m_stats); t(0); } @@ -54,6 +43,10 @@ public: /* intentionally left blank */ } + void print_statistics() { + m_stats.print_statistics(); + } + private: size_t m_memory_budget; size_t m_thrd_cnt; @@ -62,6 +55,8 @@ private: size_t m_used_memory; size_t m_counter; + + SchedulerStatistics m_stats; }; } diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 16f5e58..b14b229 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -10,9 +10,11 @@ #include <future> #include <functional> +#include <chrono> #include "framework/util/Configuration.h" #include "framework/scheduling/Epoch.h" +#include "framework/scheduling/statistics.h" namespace de { @@ -35,17 +37,21 @@ struct QueryArgs { typedef std::function<void(void*)> Job; struct Task { - Task(size_t size, size_t ts, Job job, void *args) + Task(size_t size, size_t ts, Job job, void *args, size_t type=0, SchedulerStatistics *stats=nullptr) : m_job(job) , m_size(size) , m_timestamp(ts) , m_args(args) + , m_type(type) + , m_stats(stats) {} Job m_job; size_t m_size; size_t m_timestamp; void *m_args; + size_t m_type; + SchedulerStatistics *m_stats; friend bool operator<(const Task &self, const Task &other) { return self.m_timestamp < other.m_timestamp; @@ -56,7 +62,22 @@ struct Task { } void operator()(size_t thrd_id) { + auto start = std::chrono::high_resolution_clock::now(); + if (m_stats) { + m_stats->job_begin(m_timestamp); + } + m_job(m_args); + + if (m_stats) { + m_stats->job_complete(m_timestamp); + } + auto stop = std::chrono::high_resolution_clock::now(); + + if (m_stats) { + auto time = std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start).count(); + m_stats->log_time_data(time, m_type); + } } }; diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h new file mode 100644 index 0000000..8466ffc --- /dev/null +++ b/include/framework/scheduling/statistics.h @@ -0,0 +1,96 @@ +/* + * include/framework/scheduling/statistics.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once + +#include <cstdlib> +#include <cassert> +#include <unordered_map> +#include <vector> +#include <mutex> +#include <chrono> +#include <atomic> + +namespace de { + +class SchedulerStatistics { +private: + enum class EventType { + QUEUED, + SCHEDULED, + STARTED, + FINISHED + }; + + struct Event { + size_t id; + EventType type; + }; + + struct JobInfo { + size_t id; + size_t size; + size_t type; + }; + + +public: + SchedulerStatistics() = default; + ~SchedulerStatistics() = default; + + void job_queued(size_t id, size_t type, size_t size) { + auto time = std::chrono::high_resolution_clock::now(); + } + + void job_scheduled(size_t id) { + std::unique_lock<std::mutex> lk(m_mutex); + + } + + void job_begin(size_t id) { + + } + + void job_complete(size_t id) { + + } + + /* FIXME: This is just a temporary approach */ + void log_time_data(size_t length, size_t type) { + assert(type == 1 || type == 2); + + if (type == 1) { + m_type_1_cnt.fetch_add(1); + m_type_1_total_time.fetch_add(length); + } else { + m_type_2_cnt.fetch_add(1); + m_type_2_total_time.fetch_add(length); + } + } + + void print_statistics() { + fprintf(stdout, "Query Count: %ld\tQuery Avg. Latency: %ld\n", + m_type_1_cnt.load(), + m_type_1_total_time.load() / m_type_1_cnt.load()); + fprintf(stdout, "Reconstruction Count: %ld\tReconstruction Avg. Latency: %ld\n", + m_type_2_cnt.load(), + m_type_2_total_time.load() / m_type_2_cnt.load()); + } + +private: + std::mutex m_mutex; + std::unordered_map<size_t, JobInfo> m_jobs; + std::vector<Event> m_event_log; + + std::atomic<size_t> m_type_1_cnt; + std::atomic<size_t> m_type_1_total_time; + + std::atomic<size_t> m_type_2_cnt; + std::atomic<size_t> m_type_2_total_time; +}; +} |