summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/framework/DynamicExtension.h2
-rw-r--r--include/framework/scheduling/FIFOScheduler.h1
-rw-r--r--include/framework/scheduling/statistics.h196
3 files changed, 116 insertions, 83 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 0bb1524..59b784b 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -383,6 +383,8 @@ public:
*/
void print_scheduler_statistics() const { m_sched->print_statistics(); }
+ void print_scheduler_query_data() const { m_sched->print_query_time_data(); }
+
/**
* Writes a schematic view of the currently active structure to
* stdout. Each level is on its own line, and each shard is represented.
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 703f13e..2f49b5f 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -72,6 +72,7 @@ public:
}
void print_statistics() { m_stats.print_statistics(); }
+ void print_query_time_data() { m_stats.print_query_time_data(); }
private:
psudb::LockedPriorityQueue<Task> m_task_queue;
diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h
index 706ba84..304a0c4 100644
--- a/include/framework/scheduling/statistics.h
+++ b/include/framework/scheduling/statistics.h
@@ -20,6 +20,7 @@
#include <mutex>
#include <unordered_map>
#include <vector>
+#include <cmath>
namespace de {
@@ -41,7 +42,32 @@ private:
size_t size;
size_t type;
- JobInfo(size_t id, size_t size, size_t type) : id(id), size(size), type(type) {}
+ std::chrono::system_clock::time_point queue_time;
+ std::chrono::system_clock::time_point scheduled_time;
+ std::chrono::system_clock::time_point start_time;
+ std::chrono::system_clock::time_point stop_time;
+
+ JobInfo(size_t id, size_t size, size_t type)
+ : id(id), size(size), type(type) {}
+
+ int64_t time_in_queue() {
+ return std::chrono::duration_cast<std::chrono::nanoseconds>(
+ scheduled_time - queue_time)
+ .count();
+ }
+
+ int64_t runtime() {
+ return std::chrono::duration_cast<std::chrono::nanoseconds>(stop_time -
+ start_time)
+ .count();
+ }
+
+ int64_t end_to_end_time() {
+ return std::chrono::duration_cast<std::chrono::nanoseconds>(stop_time -
+ queue_time)
+ .count();
+ }
+
};
public:
@@ -52,45 +78,31 @@ public:
std::unique_lock<std::mutex> lk(m_mutex);
m_jobs.insert({id, {id, size, type}});
m_event_log.emplace_back(id, EventType::QUEUED);
+ m_job_times_set = false;
}
void job_scheduled(size_t id) {
std::unique_lock<std::mutex> lk(m_mutex);
m_event_log.emplace_back(id, EventType::SCHEDULED);
+ m_job_times_set = false;
}
void job_begin(size_t id) {
std::unique_lock<std::mutex> lk(m_mutex);
m_event_log.emplace_back(id, EventType::STARTED);
+ m_job_times_set = false;
}
void job_complete(size_t id) {
std::unique_lock<std::mutex> lk(m_mutex);
m_event_log.emplace_back(id, EventType::FINISHED);
- }
-
- /* FIXME: This is just a temporary approach */
- void log_time_data(size_t length, size_t type) {
- assert(type == 1 || type == 2 || type == 3);
-
- if (type == 1) {
- m_type_1_cnt.fetch_add(1);
- m_type_1_total_time.fetch_add(length);
-
- if (length > m_type_1_largest_time) {
- m_type_1_largest_time.store(length);
- }
- } else if (type == 2) {
- m_type_2_cnt.fetch_add(1);
- m_type_2_total_time.fetch_add(length);
-
- if (length > m_type_2_largest_time) {
- m_type_2_largest_time.store(length);
- }
- }
+ m_job_times_set = false;
}
void print_statistics() {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ update_job_data_from_log();
+
int64_t total_queue_time = 0;
int64_t max_queue_time = 0;
int64_t min_queue_time = INT64_MAX;
@@ -101,94 +113,112 @@ public:
int64_t query_cnt = 0;
- /* dumb brute force approach; there are a million better ways to do this */
- size_t i = 0;
- for (auto &job : m_jobs) {
- std::chrono::system_clock::time_point queue_time;
- std::chrono::system_clock::time_point schedule_time;
- std::chrono::system_clock::time_point start_time;
- std::chrono::system_clock::time_point stop_time;
- /* just look at queries for now */
- if (job.second.type == 1) {
- for (auto &event : m_event_log) {
- if (event.id == job.first) {
- switch (event.type) {
- case EventType::QUEUED:
- queue_time = event.time;
- i++;
- break;
- case EventType::FINISHED:
- stop_time = event.time;
- i++;
- break;
- case EventType::SCHEDULED:
- schedule_time = event.time;
- i++;
- break;
- case EventType::STARTED:
- start_time = event.time;
- i++;
- break;
- }
- }
- }
- }
- /* event wasn't fully logged, so we'll skip it */
- if (i != 4) {
- i=0;
+ /* hard-coded for the moment to only consider queries */
+ for (auto &job : m_jobs) {
+ if (job.second.type != 1) {
continue;
}
- i=0;
-
- auto time_in_queue = std::chrono::duration_cast<std::chrono::nanoseconds>(schedule_time - queue_time).count();
- auto runtime = std::chrono::duration_cast<std::chrono::nanoseconds>(stop_time - start_time).count();
- total_queue_time += time_in_queue;
- total_runtime += runtime;
+ total_queue_time += job.second.time_in_queue();
+ total_runtime += job.second.runtime();
- if (time_in_queue > max_queue_time) {
- max_queue_time = time_in_queue;
+ if (job.second.time_in_queue() > max_queue_time) {
+ max_queue_time = job.second.time_in_queue();
}
- if (time_in_queue < min_queue_time) {
- min_queue_time = time_in_queue;
+ if (job.second.time_in_queue() < min_queue_time) {
+ min_queue_time = job.second.time_in_queue();
}
- if (runtime > max_runtime) {
- max_runtime = runtime;
+ if (job.second.runtime() > max_runtime) {
+ max_runtime = job.second.runtime();
}
- if (runtime < min_runtime) {
- min_runtime = runtime;
+ if (job.second.runtime() < min_runtime) {
+ min_runtime = job.second.runtime();
}
query_cnt++;
}
-
- if (query_cnt == 0) {
- return;
- }
+
int64_t average_queue_time = total_queue_time / query_cnt;
int64_t average_runtime = total_runtime / query_cnt;
- fprintf(stdout, "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t Max Scheduling Delay: %ld\n", average_queue_time, min_queue_time, max_queue_time);
- fprintf(stdout, "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query Latency: %ld\n", average_runtime, min_runtime, max_runtime);
+ /* calculate standard deviations */
+ int64_t queue_deviation_sum = 0;
+ int64_t runtime_deviation_sum = 0;
+ for (auto &job : m_jobs) {
+ if (job.second.type != 1) {
+ continue;
+ }
+
+ queue_deviation_sum += std::pow(job.second.time_in_queue() - average_queue_time, 2);
+ runtime_deviation_sum += std::pow(job.second.runtime() - average_runtime, 2);
+
+ }
+
+ int64_t queue_stddev = std::sqrt(queue_deviation_sum / query_cnt);
+ int64_t runtime_stddev = std::sqrt(runtime_deviation_sum / query_cnt);
+
+
+ fprintf(stdout, "Query Count: %ld\n", query_cnt);
+ fprintf(stdout, "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t Max Scheduling Delay: %ld\tStandard Deviation: %ld\n", average_queue_time, min_queue_time, max_queue_time, queue_stddev);
+ fprintf(stdout, "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query Latency: %ld\tStandard Deviation: %ld\n", average_runtime, min_runtime, max_runtime, runtime_stddev);
+ }
+
+
+ void print_query_time_data() {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ update_job_data_from_log();
+
+ for (auto &job : m_jobs) {
+ if (job.second.type == 1) {
+ fprintf(stdout, "%ld\t%ld\t%ld\n", job.second.time_in_queue(), job.second.runtime(), job.second.end_to_end_time());
+ }
+ }
}
private:
std::mutex m_mutex;
std::unordered_map<size_t, JobInfo> m_jobs;
std::vector<Event> m_event_log;
+ bool m_job_times_set;
+
+ void update_job_data_from_log() {
+ if (m_job_times_set) {
+ return;
+ }
- std::atomic<size_t> m_type_1_cnt;
- std::atomic<size_t> m_type_1_total_time;
+ /*
+ * these updates could be made when the time point is recorded, but I
+ * want to keep as much of this off the critical path as possible. Any
+ * work done here won't affect performance.
+ */
+ for (auto &event : m_event_log) {
+ if (!m_jobs.contains(event.id)) {
+ continue;
+ }
- std::atomic<size_t> m_type_2_cnt;
- std::atomic<size_t> m_type_2_total_time;
+ auto &job = m_jobs.at(event.id);
+ switch (event.type) {
+ case EventType::QUEUED:
+ job.queue_time = event.time;
+ break;
+ case EventType::FINISHED:
+ job.stop_time = event.time;
+ break;
+ case EventType::SCHEDULED:
+ job.scheduled_time = event.time;
+ break;
+ case EventType::STARTED:
+ job.start_time = event.time;
+ break;
+ }
+ }
- std::atomic<size_t> m_type_1_largest_time;
- std::atomic<size_t> m_type_2_largest_time;
+ m_job_times_set = true;
+ }
};
} // namespace de