summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-04-25 12:28:31 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2025-04-25 12:28:31 -0400
commit1957b2dd33b244754cd47db05f831a7627b8031e (patch)
treeda438aabd8ff188e2a3c94c69352a36ec243f730 /include
parent5a3d36fecabc8f220b19dcaea28a78f99b5244af (diff)
downloaddynamic-extension-1957b2dd33b244754cd47db05f831a7627b8031e.tar.gz
Scheduler statistics tracking update
The current scheme is really inefficient in terms of retreival of the results, but keeps the critical path mostly clear. It's probably worth it to do a more organized tracking of the data as it comes in, to avoid an n^2 statistics generation step at the end.
Diffstat (limited to 'include')
-rw-r--r--include/framework/DynamicExtension.h12
-rw-r--r--include/framework/scheduling/FIFOScheduler.h2
-rw-r--r--include/framework/scheduling/Task.h9
-rw-r--r--include/framework/scheduling/statistics.h122
-rw-r--r--include/framework/util/Configuration.h2
5 files changed, 115 insertions, 32 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index fb82638..0bb1524 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -471,7 +471,7 @@ private:
static void reconstruction(void *arguments) {
auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments;
auto extension = (DynamicExtension *)args->extension;
- extension->SetThreadAffinity();
+ extension->set_thread_affinity();
static std::atomic<size_t> cnt = 0;
size_t recon_id = cnt.fetch_add(1);
@@ -661,6 +661,7 @@ private:
static void async_query(void *arguments) {
auto *args = (QueryArgs<ShardType, QueryType, DynamicExtension> *)arguments;
+ args->extension->set_thread_affinity();
auto version = args->extension->get_active_version();
@@ -927,13 +928,12 @@ private:
}
//#ifdef _GNU_SOURCE
-#if 0
- void SetThreadAffinity() {
+ void set_thread_affinity() {
if constexpr (std::same_as<SchedType, SerialScheduler>) {
return;
}
- int core = m_next_core.fetch_add(1) % m_core_cnt;
+ int core = m_next_core.fetch_add(1) % m_config.physical_core_count;
cpu_set_t mask;
CPU_ZERO(&mask);
@@ -954,8 +954,10 @@ private:
CPU_SET(core, &mask);
::sched_setaffinity(0, sizeof(mask), &mask);
}
+ /*
#else
- void SetThreadAffinity() {}
+ void set_thread_affinity() {}
#endif
+*/
};
} // namespace de
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 6875aca..703f13e 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -14,6 +14,7 @@
#include "framework/scheduling/Task.h"
#include "framework/scheduling/statistics.h"
+#include <bits/chrono.h>
#include <chrono>
#include <condition_variable>
#include <thread>
@@ -61,7 +62,6 @@ public:
m_stats.job_queued(ts, type, size);
m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv));
-
m_cv.notify_all();
}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 3dbc9f4..b2884c7 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -72,7 +72,6 @@ struct Task {
}
void operator()(size_t thrd_id) {
- auto start = std::chrono::high_resolution_clock::now();
if (m_stats) {
m_stats->job_begin(m_timestamp);
}
@@ -82,14 +81,6 @@ struct Task {
if (m_stats) {
m_stats->job_complete(m_timestamp);
}
- auto stop = std::chrono::high_resolution_clock::now();
-
- if (m_stats) {
- auto time =
- std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start)
- .count();
- m_stats->log_time_data(time, m_type);
- }
if (m_lk) {
m_lk->unlock();
diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h
index a6d66ab..706ba84 100644
--- a/include/framework/scheduling/statistics.h
+++ b/include/framework/scheduling/statistics.h
@@ -30,25 +30,44 @@ private:
struct Event {
size_t id;
EventType type;
+ std::chrono::system_clock::time_point time;
+
+ Event(size_t id, EventType type)
+ : id(id), type(type), time(std::chrono::high_resolution_clock::now()) {}
};
struct JobInfo {
size_t id;
size_t size;
size_t type;
+
+ JobInfo(size_t id, size_t size, size_t type) : id(id), size(size), type(type) {}
};
public:
SchedulerStatistics() = default;
~SchedulerStatistics() = default;
- void job_queued(size_t id, size_t type, size_t size) { }
+ void job_queued(size_t id, size_t type, size_t size) {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ m_jobs.insert({id, {id, size, type}});
+ m_event_log.emplace_back(id, EventType::QUEUED);
+ }
- void job_scheduled(size_t id) { std::unique_lock<std::mutex> lk(m_mutex); }
+ void job_scheduled(size_t id) {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ m_event_log.emplace_back(id, EventType::SCHEDULED);
+ }
- void job_begin(size_t id) {}
+ void job_begin(size_t id) {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ m_event_log.emplace_back(id, EventType::STARTED);
+ }
- void job_complete(size_t id) {}
+ void job_complete(size_t id) {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ m_event_log.emplace_back(id, EventType::FINISHED);
+ }
/* FIXME: This is just a temporary approach */
void log_time_data(size_t length, size_t type) {
@@ -72,21 +91,90 @@ public:
}
void print_statistics() {
- if (m_type_1_cnt > 0) {
- fprintf(
- stdout,
- "Query Count: %ld\tQuery Avg. Latency: %ld\tMax Query Latency: %ld\n",
- m_type_1_cnt.load(), m_type_1_total_time.load() / m_type_1_cnt.load(),
- m_type_1_largest_time.load());
+ int64_t total_queue_time = 0;
+ int64_t max_queue_time = 0;
+ int64_t min_queue_time = INT64_MAX;
+
+ int64_t total_runtime = 0;
+ int64_t max_runtime = 0;
+ int64_t min_runtime = INT64_MAX;
+
+ int64_t query_cnt = 0;
+
+ /* dumb brute force approach; there are a million better ways to do this */
+ size_t i = 0;
+ for (auto &job : m_jobs) {
+ std::chrono::system_clock::time_point queue_time;
+ std::chrono::system_clock::time_point schedule_time;
+ std::chrono::system_clock::time_point start_time;
+ std::chrono::system_clock::time_point stop_time;
+
+ /* just look at queries for now */
+ if (job.second.type == 1) {
+ for (auto &event : m_event_log) {
+ if (event.id == job.first) {
+ switch (event.type) {
+ case EventType::QUEUED:
+ queue_time = event.time;
+ i++;
+ break;
+ case EventType::FINISHED:
+ stop_time = event.time;
+ i++;
+ break;
+ case EventType::SCHEDULED:
+ schedule_time = event.time;
+ i++;
+ break;
+ case EventType::STARTED:
+ start_time = event.time;
+ i++;
+ break;
+ }
+ }
+ }
+ }
+ /* event wasn't fully logged, so we'll skip it */
+ if (i != 4) {
+ i=0;
+ continue;
+ }
+ i=0;
+
+ auto time_in_queue = std::chrono::duration_cast<std::chrono::nanoseconds>(schedule_time - queue_time).count();
+ auto runtime = std::chrono::duration_cast<std::chrono::nanoseconds>(stop_time - start_time).count();
+
+ total_queue_time += time_in_queue;
+ total_runtime += runtime;
+
+ if (time_in_queue > max_queue_time) {
+ max_queue_time = time_in_queue;
+ }
+
+ if (time_in_queue < min_queue_time) {
+ min_queue_time = time_in_queue;
+ }
+
+ if (runtime > max_runtime) {
+ max_runtime = runtime;
+ }
+
+ if (runtime < min_runtime) {
+ min_runtime = runtime;
+ }
+
+ query_cnt++;
}
- if (m_type_2_cnt > 0) {
- fprintf(stdout,
- "Reconstruction Count: %ld\tReconstruction Avg. Latency: "
- "%ld\tMax Recon. Latency:%ld\n",
- m_type_2_cnt.load(),
- m_type_2_total_time.load() / m_type_2_cnt.load(),
- m_type_2_largest_time.load());
+
+ if (query_cnt == 0) {
+ return;
}
+
+ int64_t average_queue_time = total_queue_time / query_cnt;
+ int64_t average_runtime = total_runtime / query_cnt;
+
+ fprintf(stdout, "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t Max Scheduling Delay: %ld\n", average_queue_time, min_queue_time, max_queue_time);
+ fprintf(stdout, "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query Latency: %ld\n", average_runtime, min_runtime, max_runtime);
}
private:
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index 2b8a7fc..dde87fe 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -44,6 +44,8 @@ class DEConfiguration {
size_t minimum_query_threads = 4;
size_t maximum_memory_usage = 0; /* o for unbounded */
+ size_t physical_core_count = 6;
+
size_t buffer_flush_query_preemption_trigger = UINT64_MAX;
};