From 2ded45f5a20f38fdfd9f348c446c38dc713a5591 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 3 Mar 2025 13:41:19 -0500 Subject: Fixed a few concurrency bugs --- include/framework/scheduling/FIFOScheduler.h | 21 +++------------------ 1 file changed, 3 insertions(+), 18 deletions(-) (limited to 'include/framework/scheduling/FIFOScheduler.h') diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 8a4cd8d..16fe111 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -1,7 +1,7 @@ /* * include/framework/scheduling/FIFOScheduler.h * - * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Copyright (C) 2023-2025 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -9,11 +9,6 @@ * determine which jobs to run next. If more jobs are scheduled than there * are available threads, the excess will stall until a thread becomes * available and then run in the order they were received by the scheduler. - * - * TODO: We need to set up a custom threadpool based on jthreads to support - * thread preemption for a later phase of this project. That will allow us - * to avoid blocking epoch transitions on long-running queries, or to pause - * reconstructions on demand. */ #pragma once @@ -40,7 +35,6 @@ public: m_thrd_cnt((thread_cnt) ? thread_cnt : DEFAULT_MAX_THREADS), m_used_memory(0), m_used_thrds(0), m_shutdown(false) { m_sched_thrd = std::thread(&FIFOScheduler::run, this); - m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this); m_thrd_pool.resize(m_thrd_cnt); } @@ -50,7 +44,6 @@ public: } m_sched_thrd.join(); - m_sched_wakeup_thrd.join(); m_flush_thread.join(); } @@ -60,13 +53,13 @@ public: size_t ts = m_counter.fetch_add(1); if (type == 3) { - do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock)); + do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock, &m_cv)); return; } std::unique_lock lk(m_cv_lock); m_stats.job_queued(ts, type, size); - m_task_queue.push(Task(size, ts, job, args, type, &m_stats)); + m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv)); m_cv.notify_all(); } @@ -95,7 +88,6 @@ private: std::thread m_flush_thread; std::thread m_sched_thrd; - std::thread m_sched_wakeup_thrd; ctpl::thread_pool m_thrd_pool; std::atomic m_used_memory; @@ -105,13 +97,6 @@ private: SchedulerStatistics m_stats; - void periodic_wakeup() { - do { - std::this_thread::sleep_for(10us); - m_cv.notify_all(); - } while (!m_shutdown.load()); - } - void schedule_next() { auto lk = std::unique_lock(m_queue_lock); assert(m_task_queue.size() > 0); -- cgit v1.2.3