summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling/FIFOScheduler.h
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <dbr4@psu.edu>2024-12-06 13:13:51 -0500
committerGitHub <noreply@github.com>2024-12-06 18:13:51 +0000
commit9fe305c7d28e993e55c55427f377ae7e3251ea4f (patch)
tree384b687f64b84eb81bde2becac8a5f24916b07b4 /include/framework/scheduling/FIFOScheduler.h
parent47916da2ba5ed5bee2dda3cbcc58d39e1e931bfc (diff)
downloaddynamic-extension-9fe305c7d28e993e55c55427f377ae7e3251ea4f.tar.gz
Interface update (#5)
* Query Interface Adjustments/Refactoring Began the process of adjusting the query interface (and also the shard interface, to a lesser degree) to better accommodate the user. In particular the following changes have been made, 1. The number of necessary template arguments for the query type has been drastically reduced, while also removing the void pointers and manual delete functions from the interface. This was accomplished by requiring many of the sub-types associated with a query (parameters, etc.) to be nested inside the main query class, and by forcing the SHARD type to expose its associated record type. 2. User-defined query return types are now supported. Queries no longer are required to return strictly sets of records. Instead, the query now has LocalResultType and ResultType template parameters (which can be defaulted using a typedef in the Query type itself), allowing much more flexibility. Note that, at least for the short term, the LocalResultType must still expose the same is_deleted/is_tombstone interface as a Wrapped<R> used to, as this is currently needed for delete filtering. A better approach to this is, hopefully, forthcoming. 3. Updated the ISAMTree.h shard and rangequery.h query to use the new interfaces, and adjusted the associated unit tests as well. 4. Dropped the unnecessary "get_data()" function from the ShardInterface concept. 5. Dropped the need to specify a record type in the ShardInterface concept. This is now handled using a required Shard::RECORD member of the Shard class itself, which should expose the name of the record type. * Updates to framework to support new Query/Shard interfaces Pretty extensive adjustments to the framework, particularly to the templates themselves, along with some type-renaming work, to support the new query and shard interfaces. Adjusted the external query interface to take an rvalue reference, rather than a pointer, to the query parameters. * Removed framework-level delete filtering This was causing some issues with the new query interface, and should probably be reworked anyway, so I'm temporarily (TM) removing the feature. * Updated benchmarks + remaining code for new interface
Diffstat (limited to 'include/framework/scheduling/FIFOScheduler.h')
-rw-r--r--include/framework/scheduling/FIFOScheduler.h143
1 files changed, 69 insertions, 74 deletions
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 3ed4f49..7cb6d20 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -1,7 +1,7 @@
/*
* include/framework/scheduling/FIFOScheduler.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -17,11 +17,11 @@
*/
#pragma once
-#include <thread>
-#include <condition_variable>
-#include <chrono>
#include "framework/scheduling/Task.h"
#include "framework/scheduling/statistics.h"
+#include <chrono>
+#include <condition_variable>
+#include <thread>
#include "ctpl/ctpl.h"
#include "psu-ds/LockedPriorityQueue.h"
@@ -30,100 +30,95 @@ namespace de {
using namespace std::literals::chrono_literals;
-
class FIFOScheduler {
private:
- static const size_t DEFAULT_MAX_THREADS = 8;
+ static const size_t DEFAULT_MAX_THREADS = 8;
public:
- FIFOScheduler(size_t memory_budget, size_t thread_cnt)
- : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
- , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS)
- , m_used_memory(0)
- , m_used_thrds(0)
- , m_shutdown(false)
- {
- m_sched_thrd = std::thread(&FIFOScheduler::run, this);
- m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this);
- m_thrd_pool.resize(m_thrd_cnt);
+ FIFOScheduler(size_t memory_budget, size_t thread_cnt)
+ : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX),
+ m_thrd_cnt((thread_cnt) ? thread_cnt : DEFAULT_MAX_THREADS),
+ m_used_memory(0), m_used_thrds(0), m_shutdown(false) {
+ m_sched_thrd = std::thread(&FIFOScheduler::run, this);
+ m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this);
+ m_thrd_pool.resize(m_thrd_cnt);
+ }
+
+ ~FIFOScheduler() {
+ if (!m_shutdown.load()) {
+ shutdown();
}
- ~FIFOScheduler() {
- if (!m_shutdown.load()) {
- shutdown();
- }
+ m_sched_thrd.join();
+ m_sched_wakeup_thrd.join();
+ }
- m_sched_thrd.join();
- m_sched_wakeup_thrd.join();
- }
+ void schedule_job(std::function<void(void *)> job, size_t size, void *args,
+ size_t type = 0) {
+ std::unique_lock<std::mutex> lk(m_cv_lock);
+ size_t ts = m_counter.fetch_add(1);
- void schedule_job(std::function<void(void*)> job, size_t size, void *args, size_t type=0) {
- std::unique_lock<std::mutex> lk(m_cv_lock);
- size_t ts = m_counter.fetch_add(1);
+ m_stats.job_queued(ts, type, size);
+ m_task_queue.push(Task(size, ts, job, args, type, &m_stats));
- m_stats.job_queued(ts, type, size);
- m_task_queue.push(Task(size, ts, job, args, type, &m_stats));
+ m_cv.notify_all();
+ }
- m_cv.notify_all();
- }
-
- void shutdown() {
- m_shutdown.store(true);
- m_thrd_pool.stop(true);
- m_cv.notify_all();
- }
+ void shutdown() {
+ m_shutdown.store(true);
+ m_thrd_pool.stop(true);
+ m_cv.notify_all();
+ }
- void print_statistics() {
- m_stats.print_statistics();
- }
+ void print_statistics() { m_stats.print_statistics(); }
private:
- psudb::LockedPriorityQueue<Task> m_task_queue;
+ psudb::LockedPriorityQueue<Task> m_task_queue;
- size_t m_memory_budget;
- size_t m_thrd_cnt;
+ [[maybe_unused]] size_t m_memory_budget;
+ size_t m_thrd_cnt;
- std::atomic<bool> m_shutdown;
- std::atomic<size_t> m_counter;
- std::mutex m_cv_lock;
- std::condition_variable m_cv;
+ std::atomic<size_t> m_counter;
+ std::mutex m_cv_lock;
+ std::condition_variable m_cv;
- std::thread m_sched_thrd;
- std::thread m_sched_wakeup_thrd;
- ctpl::thread_pool m_thrd_pool;
+ std::thread m_sched_thrd;
+ std::thread m_sched_wakeup_thrd;
+ ctpl::thread_pool m_thrd_pool;
- std::atomic<size_t> m_used_thrds;
- std::atomic<size_t> m_used_memory;
+ std::atomic<size_t> m_used_memory;
+ std::atomic<size_t> m_used_thrds;
- SchedulerStatistics m_stats;
+ std::atomic<bool> m_shutdown;
- void periodic_wakeup() {
- do {
- std::this_thread::sleep_for(10us);
- m_cv.notify_all();
- } while (!m_shutdown.load());
- }
+ SchedulerStatistics m_stats;
- void schedule_next() {
- assert(m_task_queue.size() > 0);
- auto t = m_task_queue.pop();
- m_stats.job_scheduled(t.m_timestamp);
+ void periodic_wakeup() {
+ do {
+ std::this_thread::sleep_for(10us);
+ m_cv.notify_all();
+ } while (!m_shutdown.load());
+ }
- m_thrd_pool.push(t);
- }
+ void schedule_next() {
+ assert(m_task_queue.size() > 0);
+ auto t = m_task_queue.pop();
+ m_stats.job_scheduled(t.m_timestamp);
- void run() {
- do {
- std::unique_lock<std::mutex> cv_lock(m_cv_lock);
- m_cv.wait(cv_lock);
+ m_thrd_pool.push(t);
+ }
- while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) {
- schedule_next();
- }
- } while(!m_shutdown.load());
- }
+ void run() {
+ do {
+ std::unique_lock<std::mutex> cv_lock(m_cv_lock);
+ m_cv.wait(cv_lock);
+ while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) {
+ schedule_next();
+ }
+ } while (!m_shutdown.load());
+ }
};
-}
+} // namespace de