summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling/FIFOScheduler.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-19 15:58:04 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-19 15:58:04 -0500
commit38693c342558628c75e0ab0d23c32a95a499ed8b (patch)
treef193ff1990ea7976a8ceb5d3bf69d677d3e8ee7d /include/framework/scheduling/FIFOScheduler.h
parent138c793b0a58577713d98c98bb140cf1d9c79bee (diff)
downloaddynamic-extension-38693c342558628c75e0ab0d23c32a95a499ed8b.tar.gz
Initial rough-out of internal statistics tracker
Need to figure out the best way to do the detailed tracking in a concurrent manner. I was thinking just an event log, with parsing routines for extracting statistics. But that'll be pretty slow.
Diffstat (limited to 'include/framework/scheduling/FIFOScheduler.h')
-rw-r--r--include/framework/scheduling/FIFOScheduler.h28
1 files changed, 14 insertions, 14 deletions
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 4cdc436..513a3a2 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -8,21 +8,11 @@
*/
#pragma once
-#include <vector>
-#include <memory>
-#include <queue>
#include <thread>
#include <condition_variable>
-#include <future>
-
-#include "util/types.h"
-#include "framework/interface/Shard.h"
-#include "framework/interface/Query.h"
-#include "framework/interface/Record.h"
-#include "framework/structure/MutableBuffer.h"
-#include "framework/util/Configuration.h"
-#include "framework/structure/ExtensionStructure.h"
+
#include "framework/scheduling/Task.h"
+#include "framework/scheduling/statistics.h"
#include "ctpl/ctpl.h"
#include "psu-ds/LockedPriorityQueue.h"
@@ -54,10 +44,12 @@ public:
m_sched_thrd.join();
}
- void schedule_job(std::function<void(void*)> job, size_t size, void *args) {
+ void schedule_job(std::function<void(void*)> job, size_t size, void *args, size_t type=0) {
std::unique_lock<std::mutex> lk(m_cv_lock);
size_t ts = m_counter.fetch_add(1);
- m_task_queue.push(Task(size, ts, job, args));
+
+ m_stats.job_queued(ts, type, size);
+ m_task_queue.push(Task(size, ts, job, args, type, &m_stats));
m_cv.notify_all();
}
@@ -68,6 +60,10 @@ public:
m_cv.notify_all();
}
+ void print_statistics() {
+ m_stats.print_statistics();
+ }
+
private:
psudb::LockedPriorityQueue<Task> m_task_queue;
@@ -86,9 +82,13 @@ private:
std::atomic<size_t> m_used_thrds;
std::atomic<size_t> m_used_memory;
+ SchedulerStatistics m_stats;
+
void schedule_next() {
assert(m_task_queue.size() > 0);
auto t = m_task_queue.pop();
+ m_stats.job_scheduled(t.m_timestamp);
+
m_thrd_pool.push(t);
}