summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling/FIFOScheduler.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-03-03 13:41:19 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-03-03 13:41:19 -0500
commit2ded45f5a20f38fdfd9f348c446c38dc713a5591 (patch)
tree746fb09b49ee4c00fc3e4760d899d60d8d8dcce0 /include/framework/scheduling/FIFOScheduler.h
parentd116b94389538aa8e0e7354fae77693b980de4f0 (diff)
downloaddynamic-extension-2ded45f5a20f38fdfd9f348c446c38dc713a5591.tar.gz
Fixed a few concurrency bugs
Diffstat (limited to 'include/framework/scheduling/FIFOScheduler.h')
-rw-r--r--include/framework/scheduling/FIFOScheduler.h21
1 files changed, 3 insertions, 18 deletions
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 8a4cd8d..16fe111 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -1,7 +1,7 @@
/*
* include/framework/scheduling/FIFOScheduler.h
*
- * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -9,11 +9,6 @@
* determine which jobs to run next. If more jobs are scheduled than there
* are available threads, the excess will stall until a thread becomes
* available and then run in the order they were received by the scheduler.
- *
- * TODO: We need to set up a custom threadpool based on jthreads to support
- * thread preemption for a later phase of this project. That will allow us
- * to avoid blocking epoch transitions on long-running queries, or to pause
- * reconstructions on demand.
*/
#pragma once
@@ -40,7 +35,6 @@ public:
m_thrd_cnt((thread_cnt) ? thread_cnt : DEFAULT_MAX_THREADS),
m_used_memory(0), m_used_thrds(0), m_shutdown(false) {
m_sched_thrd = std::thread(&FIFOScheduler::run, this);
- m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this);
m_thrd_pool.resize(m_thrd_cnt);
}
@@ -50,7 +44,6 @@ public:
}
m_sched_thrd.join();
- m_sched_wakeup_thrd.join();
m_flush_thread.join();
}
@@ -60,13 +53,13 @@ public:
size_t ts = m_counter.fetch_add(1);
if (type == 3) {
- do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock));
+ do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock, &m_cv));
return;
}
std::unique_lock<std::mutex> lk(m_cv_lock);
m_stats.job_queued(ts, type, size);
- m_task_queue.push(Task(size, ts, job, args, type, &m_stats));
+ m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv));
m_cv.notify_all();
}
@@ -95,7 +88,6 @@ private:
std::thread m_flush_thread;
std::thread m_sched_thrd;
- std::thread m_sched_wakeup_thrd;
ctpl::thread_pool m_thrd_pool;
std::atomic<size_t> m_used_memory;
@@ -105,13 +97,6 @@ private:
SchedulerStatistics m_stats;
- void periodic_wakeup() {
- do {
- std::this_thread::sleep_for(10us);
- m_cv.notify_all();
- } while (!m_shutdown.load());
- }
-
void schedule_next() {
auto lk = std::unique_lock<std::mutex>(m_queue_lock);
assert(m_task_queue.size() > 0);