summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling/FIFOScheduler.h
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <dbr4@psu.edu>2024-02-09 14:06:59 -0500
committerGitHub <noreply@github.com>2024-02-09 14:06:59 -0500
commitbc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30 (patch)
tree66333c55feb0ea8875a50e6dc07c8535d241bf1c /include/framework/scheduling/FIFOScheduler.h
parent076e104b8672924c3d80cd1da2fdb5ebee1766ac (diff)
parent46885246313358a3b606eca139b20280e96db10e (diff)
downloaddynamic-extension-bc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30.tar.gz
Merge pull request #1 from dbrumbaugh/new-buffer
Initial Concurrency Implementation
Diffstat (limited to 'include/framework/scheduling/FIFOScheduler.h')
-rw-r--r--include/framework/scheduling/FIFOScheduler.h129
1 files changed, 129 insertions, 0 deletions
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
new file mode 100644
index 0000000..3ed4f49
--- /dev/null
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -0,0 +1,129 @@
+/*
+ * include/framework/scheduling/FIFOScheduler.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * This scheduler runs just concurrently, using a standard FIFO queue to
+ * determine which jobs to run next. If more jobs are scheduled than there
+ * are available threads, the excess will stall until a thread becomes
+ * available and then run in the order they were received by the scheduler.
+ *
+ * TODO: We need to set up a custom threadpool based on jthreads to support
+ * thread preemption for a later phase of this project. That will allow us
+ * to avoid blocking epoch transitions on long-running queries, or to pause
+ * reconstructions on demand.
+ */
+#pragma once
+
+#include <thread>
+#include <condition_variable>
+#include <chrono>
+#include "framework/scheduling/Task.h"
+#include "framework/scheduling/statistics.h"
+
+#include "ctpl/ctpl.h"
+#include "psu-ds/LockedPriorityQueue.h"
+
+namespace de {
+
+using namespace std::literals::chrono_literals;
+
+
+class FIFOScheduler {
+private:
+ static const size_t DEFAULT_MAX_THREADS = 8;
+
+public:
+ FIFOScheduler(size_t memory_budget, size_t thread_cnt)
+ : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
+ , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS)
+ , m_used_memory(0)
+ , m_used_thrds(0)
+ , m_shutdown(false)
+ {
+ m_sched_thrd = std::thread(&FIFOScheduler::run, this);
+ m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this);
+ m_thrd_pool.resize(m_thrd_cnt);
+ }
+
+ ~FIFOScheduler() {
+ if (!m_shutdown.load()) {
+ shutdown();
+ }
+
+ m_sched_thrd.join();
+ m_sched_wakeup_thrd.join();
+ }
+
+ void schedule_job(std::function<void(void*)> job, size_t size, void *args, size_t type=0) {
+ std::unique_lock<std::mutex> lk(m_cv_lock);
+ size_t ts = m_counter.fetch_add(1);
+
+ m_stats.job_queued(ts, type, size);
+ m_task_queue.push(Task(size, ts, job, args, type, &m_stats));
+
+ m_cv.notify_all();
+ }
+
+ void shutdown() {
+ m_shutdown.store(true);
+ m_thrd_pool.stop(true);
+ m_cv.notify_all();
+ }
+
+ void print_statistics() {
+ m_stats.print_statistics();
+ }
+
+private:
+ psudb::LockedPriorityQueue<Task> m_task_queue;
+
+ size_t m_memory_budget;
+ size_t m_thrd_cnt;
+
+ std::atomic<bool> m_shutdown;
+
+ std::atomic<size_t> m_counter;
+ std::mutex m_cv_lock;
+ std::condition_variable m_cv;
+
+ std::thread m_sched_thrd;
+ std::thread m_sched_wakeup_thrd;
+ ctpl::thread_pool m_thrd_pool;
+
+ std::atomic<size_t> m_used_thrds;
+ std::atomic<size_t> m_used_memory;
+
+ SchedulerStatistics m_stats;
+
+ void periodic_wakeup() {
+ do {
+ std::this_thread::sleep_for(10us);
+ m_cv.notify_all();
+ } while (!m_shutdown.load());
+ }
+
+ void schedule_next() {
+ assert(m_task_queue.size() > 0);
+ auto t = m_task_queue.pop();
+ m_stats.job_scheduled(t.m_timestamp);
+
+ m_thrd_pool.push(t);
+ }
+
+ void run() {
+ do {
+ std::unique_lock<std::mutex> cv_lock(m_cv_lock);
+ m_cv.wait(cv_lock);
+
+ while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) {
+ schedule_next();
+ }
+ } while(!m_shutdown.load());
+ }
+
+};
+
+}