summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-11-06 10:01:23 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2023-11-06 10:20:03 -0500
commit4e4cf858122ca6c1ae6d5f635e839089769fee38 (patch)
treeaf2baf54991e8aca6766671c5701fbfd6dffed7d /include/framework
parent83ca486048a5053d8c75bb5041091edb1b183a85 (diff)
downloaddynamic-extension-4e4cf858122ca6c1ae6d5f635e839089769fee38.tar.gz
Scheduling: Switched over to a thread pool model
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/scheduling/FIFOScheduler.h13
-rw-r--r--include/framework/scheduling/SerialScheduler.h2
-rw-r--r--include/framework/scheduling/Task.h2
3 files changed, 12 insertions, 5 deletions
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 91a72b3..1521eb6 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -24,20 +24,26 @@
#include "framework/structure/ExtensionStructure.h"
#include "framework/scheduling/Task.h"
+#include "ctpl/ctpl.h"
#include "psu-ds/LockedPriorityQueue.h"
namespace de {
+
class FIFOScheduler {
+private:
+ static const size_t DEFAULT_MAX_THREADS = 8;
+
public:
FIFOScheduler(size_t memory_budget, size_t thread_cnt)
: m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
- , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX)
+ , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS)
, m_used_memory(0)
, m_used_thrds(0)
, m_shutdown(false)
{
m_sched_thrd = std::thread(&FIFOScheduler::run, this);
+ m_thrd_pool.resize(m_thrd_cnt);
}
~FIFOScheduler() {
@@ -72,6 +78,7 @@ private:
std::condition_variable m_cv;
std::thread m_sched_thrd;
+ ctpl::thread_pool m_thrd_pool;
std::atomic<size_t> m_used_thrds;
std::atomic<size_t> m_used_memory;
@@ -79,7 +86,7 @@ private:
void schedule_next() {
assert(m_task_queue.size() > 0);
auto t = m_task_queue.pop();
- t();
+ m_thrd_pool.push(t);
}
void run() {
@@ -87,7 +94,7 @@ private:
std::unique_lock<std::mutex> cv_lock(m_cv_lock);
m_cv.wait(cv_lock);
- while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) {
+ while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) {
schedule_next();
}
} while(!m_shutdown.load());
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
index 9c767e8..93611d1 100644
--- a/include/framework/scheduling/SerialScheduler.h
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -47,7 +47,7 @@ public:
void schedule_job(std::function<void(void*)> job, size_t size, void *args) {
size_t ts = m_counter++;
auto t = Task(size, ts, job, args);
- t();
+ t(0);
}
void shutdown() {
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 228665f..6dfd7df 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -54,7 +54,7 @@ struct Task {
return self.m_timestamp > other.m_timestamp;
}
- void operator()() {
+ void operator()(size_t thrd_id) {
m_job(m_args);
}
};