summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-19 15:58:04 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-19 15:58:04 -0500
commit38693c342558628c75e0ab0d23c32a95a499ed8b (patch)
treef193ff1990ea7976a8ceb5d3bf69d677d3e8ee7d
parent138c793b0a58577713d98c98bb140cf1d9c79bee (diff)
downloaddynamic-extension-38693c342558628c75e0ab0d23c32a95a499ed8b.tar.gz
Initial rough-out of internal statistics tracker
Need to figure out the best way to do the detailed tracking in a concurrent manner. I was thinking just an event log, with parsing routines for extracting statistics. But that'll be pretty slow.
-rw-r--r--CMakeLists.txt2
-rw-r--r--benchmarks/reconstruction_interference.cpp2
-rw-r--r--include/framework/DynamicExtension.h15
-rw-r--r--include/framework/interface/Scheduler.h3
-rw-r--r--include/framework/scheduling/FIFOScheduler.h28
-rw-r--r--include/framework/scheduling/SerialScheduler.h27
-rw-r--r--include/framework/scheduling/Task.h23
-rw-r--r--include/framework/scheduling/statistics.h96
8 files changed, 160 insertions, 36 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index e47ca4e..6e286d7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
set(namespace "de")
project("Practical Dynamic Extension" VERSION 0.1.0)
-set(debug true)
+set(debug false)
set(tests True)
set(bench true)
set(old_bench False)
diff --git a/benchmarks/reconstruction_interference.cpp b/benchmarks/reconstruction_interference.cpp
index a843c71..2fb1591 100644
--- a/benchmarks/reconstruction_interference.cpp
+++ b/benchmarks/reconstruction_interference.cpp
@@ -101,6 +101,8 @@ int main(int argc, char **argv) {
auto s = extension->create_static_structure();
fprintf(stderr, "Construction complete\n");
q_thrd.join();
+
+ extension->print_scheduler_statistics();
delete extension;
delete s;
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 7590de2..89ee30f 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -40,6 +40,10 @@ class DynamicExtension {
typedef Epoch<R, S, Q, L> _Epoch;
typedef BufferView<R> BufView;
+ static constexpr size_t QUERY = 1;
+ static constexpr size_t RECONSTRUCTION = 2;
+
+
public:
DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0,
size_t thread_cnt=16)
@@ -226,6 +230,11 @@ public:
return t;
}
+
+ void print_scheduler_statistics() {
+ m_sched.print_statistics();
+ }
+
private:
SCHED m_sched;
@@ -271,7 +280,7 @@ private:
*/
epoch->start_job();
- m_sched.schedule_job(reconstruction, 0, args);
+ m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
/* wait for compaction completion */
wait.get();
@@ -511,7 +520,7 @@ private:
args->compaction = false;
/* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */
- m_sched.schedule_job(reconstruction, 0, args);
+ m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
}
std::future<std::vector<R>> schedule_query(void *query_parms) {
@@ -522,7 +531,7 @@ private:
args->query_parms = query_parms;
auto result = args->result_set.get_future();
- m_sched.schedule_job(async_query, 0, args);
+ m_sched.schedule_job(async_query, 0, args, QUERY);
return result;
}
diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h
index 94afe6c..451ddd2 100644
--- a/include/framework/interface/Scheduler.h
+++ b/include/framework/interface/Scheduler.h
@@ -13,6 +13,7 @@
template <typename S>
concept SchedulerInterface = requires(S s, size_t i, void *vp, de::Job j) {
{S(i, i)};
- {s.schedule_job(j, i, vp)} -> std::convertible_to<void>;
+ {s.schedule_job(j, i, vp, i)} -> std::convertible_to<void>;
{s.shutdown()};
+ {s.print_statistics()};
};
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 4cdc436..513a3a2 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -8,21 +8,11 @@
*/
#pragma once
-#include <vector>
-#include <memory>
-#include <queue>
#include <thread>
#include <condition_variable>
-#include <future>
-
-#include "util/types.h"
-#include "framework/interface/Shard.h"
-#include "framework/interface/Query.h"
-#include "framework/interface/Record.h"
-#include "framework/structure/MutableBuffer.h"
-#include "framework/util/Configuration.h"
-#include "framework/structure/ExtensionStructure.h"
+
#include "framework/scheduling/Task.h"
+#include "framework/scheduling/statistics.h"
#include "ctpl/ctpl.h"
#include "psu-ds/LockedPriorityQueue.h"
@@ -54,10 +44,12 @@ public:
m_sched_thrd.join();
}
- void schedule_job(std::function<void(void*)> job, size_t size, void *args) {
+ void schedule_job(std::function<void(void*)> job, size_t size, void *args, size_t type=0) {
std::unique_lock<std::mutex> lk(m_cv_lock);
size_t ts = m_counter.fetch_add(1);
- m_task_queue.push(Task(size, ts, job, args));
+
+ m_stats.job_queued(ts, type, size);
+ m_task_queue.push(Task(size, ts, job, args, type, &m_stats));
m_cv.notify_all();
}
@@ -68,6 +60,10 @@ public:
m_cv.notify_all();
}
+ void print_statistics() {
+ m_stats.print_statistics();
+ }
+
private:
psudb::LockedPriorityQueue<Task> m_task_queue;
@@ -86,9 +82,13 @@ private:
std::atomic<size_t> m_used_thrds;
std::atomic<size_t> m_used_memory;
+ SchedulerStatistics m_stats;
+
void schedule_next() {
assert(m_task_queue.size() > 0);
auto t = m_task_queue.pop();
+ m_stats.job_scheduled(t.m_timestamp);
+
m_thrd_pool.push(t);
}
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
index 10c2af2..ac59301 100644
--- a/include/framework/scheduling/SerialScheduler.h
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -14,21 +14,8 @@
*/
#pragma once
-#include <vector>
-#include <memory>
-#include <queue>
-#include <thread>
-#include <condition_variable>
-#include <future>
-
-#include "util/types.h"
-#include "framework/interface/Shard.h"
-#include "framework/interface/Query.h"
-#include "framework/interface/Record.h"
-#include "framework/structure/MutableBuffer.h"
-#include "framework/util/Configuration.h"
-#include "framework/structure/ExtensionStructure.h"
#include "framework/scheduling/Task.h"
+#include "framework/scheduling/statistics.h"
namespace de {
@@ -44,9 +31,11 @@ public:
~SerialScheduler() = default;
- void schedule_job(std::function<void(void*)> job, size_t size, void *args) {
+ void schedule_job(std::function<void(void*)> job, size_t size, void *args, size_t type=0) {
size_t ts = m_counter++;
- auto t = Task(size, ts, job, args);
+ m_stats.job_queued(ts, type, size);
+ m_stats.job_scheduled(ts);
+ auto t = Task(size, ts, job, args, type, &m_stats);
t(0);
}
@@ -54,6 +43,10 @@ public:
/* intentionally left blank */
}
+ void print_statistics() {
+ m_stats.print_statistics();
+ }
+
private:
size_t m_memory_budget;
size_t m_thrd_cnt;
@@ -62,6 +55,8 @@ private:
size_t m_used_memory;
size_t m_counter;
+
+ SchedulerStatistics m_stats;
};
}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 16f5e58..b14b229 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -10,9 +10,11 @@
#include <future>
#include <functional>
+#include <chrono>
#include "framework/util/Configuration.h"
#include "framework/scheduling/Epoch.h"
+#include "framework/scheduling/statistics.h"
namespace de {
@@ -35,17 +37,21 @@ struct QueryArgs {
typedef std::function<void(void*)> Job;
struct Task {
- Task(size_t size, size_t ts, Job job, void *args)
+ Task(size_t size, size_t ts, Job job, void *args, size_t type=0, SchedulerStatistics *stats=nullptr)
: m_job(job)
, m_size(size)
, m_timestamp(ts)
, m_args(args)
+ , m_type(type)
+ , m_stats(stats)
{}
Job m_job;
size_t m_size;
size_t m_timestamp;
void *m_args;
+ size_t m_type;
+ SchedulerStatistics *m_stats;
friend bool operator<(const Task &self, const Task &other) {
return self.m_timestamp < other.m_timestamp;
@@ -56,7 +62,22 @@ struct Task {
}
void operator()(size_t thrd_id) {
+ auto start = std::chrono::high_resolution_clock::now();
+ if (m_stats) {
+ m_stats->job_begin(m_timestamp);
+ }
+
m_job(m_args);
+
+ if (m_stats) {
+ m_stats->job_complete(m_timestamp);
+ }
+ auto stop = std::chrono::high_resolution_clock::now();
+
+ if (m_stats) {
+ auto time = std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start).count();
+ m_stats->log_time_data(time, m_type);
+ }
}
};
diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h
new file mode 100644
index 0000000..8466ffc
--- /dev/null
+++ b/include/framework/scheduling/statistics.h
@@ -0,0 +1,96 @@
+/*
+ * include/framework/scheduling/statistics.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <cstdlib>
+#include <cassert>
+#include <unordered_map>
+#include <vector>
+#include <mutex>
+#include <chrono>
+#include <atomic>
+
+namespace de {
+
+class SchedulerStatistics {
+private:
+ enum class EventType {
+ QUEUED,
+ SCHEDULED,
+ STARTED,
+ FINISHED
+ };
+
+ struct Event {
+ size_t id;
+ EventType type;
+ };
+
+ struct JobInfo {
+ size_t id;
+ size_t size;
+ size_t type;
+ };
+
+
+public:
+ SchedulerStatistics() = default;
+ ~SchedulerStatistics() = default;
+
+ void job_queued(size_t id, size_t type, size_t size) {
+ auto time = std::chrono::high_resolution_clock::now();
+ }
+
+ void job_scheduled(size_t id) {
+ std::unique_lock<std::mutex> lk(m_mutex);
+
+ }
+
+ void job_begin(size_t id) {
+
+ }
+
+ void job_complete(size_t id) {
+
+ }
+
+ /* FIXME: This is just a temporary approach */
+ void log_time_data(size_t length, size_t type) {
+ assert(type == 1 || type == 2);
+
+ if (type == 1) {
+ m_type_1_cnt.fetch_add(1);
+ m_type_1_total_time.fetch_add(length);
+ } else {
+ m_type_2_cnt.fetch_add(1);
+ m_type_2_total_time.fetch_add(length);
+ }
+ }
+
+ void print_statistics() {
+ fprintf(stdout, "Query Count: %ld\tQuery Avg. Latency: %ld\n",
+ m_type_1_cnt.load(),
+ m_type_1_total_time.load() / m_type_1_cnt.load());
+ fprintf(stdout, "Reconstruction Count: %ld\tReconstruction Avg. Latency: %ld\n",
+ m_type_2_cnt.load(),
+ m_type_2_total_time.load() / m_type_2_cnt.load());
+ }
+
+private:
+ std::mutex m_mutex;
+ std::unordered_map<size_t, JobInfo> m_jobs;
+ std::vector<Event> m_event_log;
+
+ std::atomic<size_t> m_type_1_cnt;
+ std::atomic<size_t> m_type_1_total_time;
+
+ std::atomic<size_t> m_type_2_cnt;
+ std::atomic<size_t> m_type_2_total_time;
+};
+}