diff options
Diffstat (limited to 'include/framework/scheduling/FIFOScheduler.h')
| -rw-r--r-- | include/framework/scheduling/FIFOScheduler.h | 143 |
1 files changed, 69 insertions, 74 deletions
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 3ed4f49..7cb6d20 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -1,7 +1,7 @@ /* * include/framework/scheduling/FIFOScheduler.h * - * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> * * Distributed under the Modified BSD License. * @@ -17,11 +17,11 @@ */ #pragma once -#include <thread> -#include <condition_variable> -#include <chrono> #include "framework/scheduling/Task.h" #include "framework/scheduling/statistics.h" +#include <chrono> +#include <condition_variable> +#include <thread> #include "ctpl/ctpl.h" #include "psu-ds/LockedPriorityQueue.h" @@ -30,100 +30,95 @@ namespace de { using namespace std::literals::chrono_literals; - class FIFOScheduler { private: - static const size_t DEFAULT_MAX_THREADS = 8; + static const size_t DEFAULT_MAX_THREADS = 8; public: - FIFOScheduler(size_t memory_budget, size_t thread_cnt) - : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS) - , m_used_memory(0) - , m_used_thrds(0) - , m_shutdown(false) - { - m_sched_thrd = std::thread(&FIFOScheduler::run, this); - m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this); - m_thrd_pool.resize(m_thrd_cnt); + FIFOScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX), + m_thrd_cnt((thread_cnt) ? thread_cnt : DEFAULT_MAX_THREADS), + m_used_memory(0), m_used_thrds(0), m_shutdown(false) { + m_sched_thrd = std::thread(&FIFOScheduler::run, this); + m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this); + m_thrd_pool.resize(m_thrd_cnt); + } + + ~FIFOScheduler() { + if (!m_shutdown.load()) { + shutdown(); } - ~FIFOScheduler() { - if (!m_shutdown.load()) { - shutdown(); - } + m_sched_thrd.join(); + m_sched_wakeup_thrd.join(); + } - m_sched_thrd.join(); - m_sched_wakeup_thrd.join(); - } + void schedule_job(std::function<void(void *)> job, size_t size, void *args, + size_t type = 0) { + std::unique_lock<std::mutex> lk(m_cv_lock); + size_t ts = m_counter.fetch_add(1); - void schedule_job(std::function<void(void*)> job, size_t size, void *args, size_t type=0) { - std::unique_lock<std::mutex> lk(m_cv_lock); - size_t ts = m_counter.fetch_add(1); + m_stats.job_queued(ts, type, size); + m_task_queue.push(Task(size, ts, job, args, type, &m_stats)); - m_stats.job_queued(ts, type, size); - m_task_queue.push(Task(size, ts, job, args, type, &m_stats)); + m_cv.notify_all(); + } - m_cv.notify_all(); - } - - void shutdown() { - m_shutdown.store(true); - m_thrd_pool.stop(true); - m_cv.notify_all(); - } + void shutdown() { + m_shutdown.store(true); + m_thrd_pool.stop(true); + m_cv.notify_all(); + } - void print_statistics() { - m_stats.print_statistics(); - } + void print_statistics() { m_stats.print_statistics(); } private: - psudb::LockedPriorityQueue<Task> m_task_queue; + psudb::LockedPriorityQueue<Task> m_task_queue; - size_t m_memory_budget; - size_t m_thrd_cnt; + [[maybe_unused]] size_t m_memory_budget; + size_t m_thrd_cnt; - std::atomic<bool> m_shutdown; - std::atomic<size_t> m_counter; - std::mutex m_cv_lock; - std::condition_variable m_cv; + std::atomic<size_t> m_counter; + std::mutex m_cv_lock; + std::condition_variable m_cv; - std::thread m_sched_thrd; - std::thread m_sched_wakeup_thrd; - ctpl::thread_pool m_thrd_pool; + std::thread m_sched_thrd; + std::thread m_sched_wakeup_thrd; + ctpl::thread_pool m_thrd_pool; - std::atomic<size_t> m_used_thrds; - std::atomic<size_t> m_used_memory; + std::atomic<size_t> m_used_memory; + std::atomic<size_t> m_used_thrds; - SchedulerStatistics m_stats; + std::atomic<bool> m_shutdown; - void periodic_wakeup() { - do { - std::this_thread::sleep_for(10us); - m_cv.notify_all(); - } while (!m_shutdown.load()); - } + SchedulerStatistics m_stats; - void schedule_next() { - assert(m_task_queue.size() > 0); - auto t = m_task_queue.pop(); - m_stats.job_scheduled(t.m_timestamp); + void periodic_wakeup() { + do { + std::this_thread::sleep_for(10us); + m_cv.notify_all(); + } while (!m_shutdown.load()); + } - m_thrd_pool.push(t); - } + void schedule_next() { + assert(m_task_queue.size() > 0); + auto t = m_task_queue.pop(); + m_stats.job_scheduled(t.m_timestamp); - void run() { - do { - std::unique_lock<std::mutex> cv_lock(m_cv_lock); - m_cv.wait(cv_lock); + m_thrd_pool.push(t); + } - while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) { - schedule_next(); - } - } while(!m_shutdown.load()); - } + void run() { + do { + std::unique_lock<std::mutex> cv_lock(m_cv_lock); + m_cv.wait(cv_lock); + while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) { + schedule_next(); + } + } while (!m_shutdown.load()); + } }; -} +} // namespace de |