diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-02-11 17:32:10 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-02-11 17:32:10 -0500 |
| commit | 85afe4ef04f327862460570fb0aa4c30afcf7cc7 (patch) | |
| tree | aba55db313b752df4ac073db117900e0128c22fb /include/framework/scheduling | |
| parent | c04efb2640421be7a24f851c08e290c89b7b46f2 (diff) | |
| download | dynamic-extension-85afe4ef04f327862460570fb0aa4c30afcf7cc7.tar.gz | |
Progress: began adding parallel merging and locking of levels
Diffstat (limited to 'include/framework/scheduling')
| -rw-r--r-- | include/framework/scheduling/FIFOScheduler.h | 20 | ||||
| -rw-r--r-- | include/framework/scheduling/LockManager.h | 52 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 9 | ||||
| -rw-r--r-- | include/framework/scheduling/statistics.h | 4 |
4 files changed, 80 insertions, 5 deletions
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 4c1db8d..8a4cd8d 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -51,13 +51,20 @@ public: m_sched_thrd.join(); m_sched_wakeup_thrd.join(); + m_flush_thread.join(); } void schedule_job(std::function<void(void *)> job, size_t size, void *args, size_t type = 0) { - std::unique_lock<std::mutex> lk(m_cv_lock); + size_t ts = m_counter.fetch_add(1); + if (type == 3) { + do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock)); + return; + } + + std::unique_lock<std::mutex> lk(m_cv_lock); m_stats.job_queued(ts, type, size); m_task_queue.push(Task(size, ts, job, args, type, &m_stats)); @@ -84,6 +91,9 @@ private: std::condition_variable m_cv; std::mutex m_queue_lock; + std::mutex m_flush_lock; + std::thread m_flush_thread; + std::thread m_sched_thrd; std::thread m_sched_wakeup_thrd; ctpl::thread_pool m_thrd_pool; @@ -121,6 +131,14 @@ private: } } while (!m_shutdown.load()); } + + void do_flush(Task task) { + m_flush_lock.lock(); + if (m_flush_thread.joinable()) { + m_flush_thread.join(); + } + m_flush_thread = std::thread(task, 0); + } }; } // namespace de diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h new file mode 100644 index 0000000..91ed778 --- /dev/null +++ b/include/framework/scheduling/LockManager.h @@ -0,0 +1,52 @@ +/* + * + */ + +#pragma once +#include <deque> +#include <atomic> + +namespace de { +class LockManager { +public: + LockManager(size_t levels=1) { + for (size_t i=0; i < levels; i++) { + m_lks.emplace_back(false); + } + } + + ~LockManager() = default; + + void add_lock() { + m_lks.emplace_back(false); + } + + void release_lock(size_t idx) { + if (idx < m_lks.size()) { + m_lks[idx].store(false); + } + } + + bool is_locked(size_t idx) { + if (idx < m_lks.size()) { + return m_lks[idx].load(); + } + + return false; + } + + bool take_lock(size_t idx) { + if (idx < m_lks.size()) { + bool old = m_lks[idx].load(); + if (!old) { + return m_lks[idx].compare_exchange_strong(old, true); + } + } + + return false; + } + +private: + std::deque<std::atomic<bool>> m_lks; +}; +} diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index ed40d3d..2d68f56 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -49,9 +49,9 @@ typedef std::function<void(void *)> Job; struct Task { Task(size_t size, size_t ts, Job job, void *args, size_t type = 0, - SchedulerStatistics *stats = nullptr) + SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr) : m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type), - m_stats(stats) {} + m_stats(stats), m_lk(lk) {} Job m_job; size_t m_size; @@ -59,6 +59,7 @@ struct Task { void *m_args; size_t m_type; SchedulerStatistics *m_stats; + std::mutex *m_lk; friend bool operator<(const Task &self, const Task &other) { return self.m_timestamp < other.m_timestamp; @@ -87,6 +88,10 @@ struct Task { .count(); m_stats->log_time_data(time, m_type); } + + if (m_lk) { + m_lk->unlock(); + } } }; diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h index 48c186f..a6d66ab 100644 --- a/include/framework/scheduling/statistics.h +++ b/include/framework/scheduling/statistics.h @@ -52,7 +52,7 @@ public: /* FIXME: This is just a temporary approach */ void log_time_data(size_t length, size_t type) { - assert(type == 1 || type == 2); + assert(type == 1 || type == 2 || type == 3); if (type == 1) { m_type_1_cnt.fetch_add(1); @@ -61,7 +61,7 @@ public: if (length > m_type_1_largest_time) { m_type_1_largest_time.store(length); } - } else { + } else if (type == 2) { m_type_2_cnt.fetch_add(1); m_type_2_total_time.fetch_add(length); |