summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/scheduling')
-rw-r--r--include/framework/scheduling/FIFOScheduler.h20
-rw-r--r--include/framework/scheduling/LockManager.h52
-rw-r--r--include/framework/scheduling/Task.h9
-rw-r--r--include/framework/scheduling/statistics.h4
4 files changed, 80 insertions, 5 deletions
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 4c1db8d..8a4cd8d 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -51,13 +51,20 @@ public:
m_sched_thrd.join();
m_sched_wakeup_thrd.join();
+ m_flush_thread.join();
}
void schedule_job(std::function<void(void *)> job, size_t size, void *args,
size_t type = 0) {
- std::unique_lock<std::mutex> lk(m_cv_lock);
+
size_t ts = m_counter.fetch_add(1);
+ if (type == 3) {
+ do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock));
+ return;
+ }
+
+ std::unique_lock<std::mutex> lk(m_cv_lock);
m_stats.job_queued(ts, type, size);
m_task_queue.push(Task(size, ts, job, args, type, &m_stats));
@@ -84,6 +91,9 @@ private:
std::condition_variable m_cv;
std::mutex m_queue_lock;
+ std::mutex m_flush_lock;
+ std::thread m_flush_thread;
+
std::thread m_sched_thrd;
std::thread m_sched_wakeup_thrd;
ctpl::thread_pool m_thrd_pool;
@@ -121,6 +131,14 @@ private:
}
} while (!m_shutdown.load());
}
+
+ void do_flush(Task task) {
+ m_flush_lock.lock();
+ if (m_flush_thread.joinable()) {
+ m_flush_thread.join();
+ }
+ m_flush_thread = std::thread(task, 0);
+ }
};
} // namespace de
diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h
new file mode 100644
index 0000000..91ed778
--- /dev/null
+++ b/include/framework/scheduling/LockManager.h
@@ -0,0 +1,52 @@
+/*
+ *
+ */
+
+#pragma once
+#include <deque>
+#include <atomic>
+
+namespace de {
+class LockManager {
+public:
+ LockManager(size_t levels=1) {
+ for (size_t i=0; i < levels; i++) {
+ m_lks.emplace_back(false);
+ }
+ }
+
+ ~LockManager() = default;
+
+ void add_lock() {
+ m_lks.emplace_back(false);
+ }
+
+ void release_lock(size_t idx) {
+ if (idx < m_lks.size()) {
+ m_lks[idx].store(false);
+ }
+ }
+
+ bool is_locked(size_t idx) {
+ if (idx < m_lks.size()) {
+ return m_lks[idx].load();
+ }
+
+ return false;
+ }
+
+ bool take_lock(size_t idx) {
+ if (idx < m_lks.size()) {
+ bool old = m_lks[idx].load();
+ if (!old) {
+ return m_lks[idx].compare_exchange_strong(old, true);
+ }
+ }
+
+ return false;
+ }
+
+private:
+ std::deque<std::atomic<bool>> m_lks;
+};
+}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index ed40d3d..2d68f56 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -49,9 +49,9 @@ typedef std::function<void(void *)> Job;
struct Task {
Task(size_t size, size_t ts, Job job, void *args, size_t type = 0,
- SchedulerStatistics *stats = nullptr)
+ SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr)
: m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type),
- m_stats(stats) {}
+ m_stats(stats), m_lk(lk) {}
Job m_job;
size_t m_size;
@@ -59,6 +59,7 @@ struct Task {
void *m_args;
size_t m_type;
SchedulerStatistics *m_stats;
+ std::mutex *m_lk;
friend bool operator<(const Task &self, const Task &other) {
return self.m_timestamp < other.m_timestamp;
@@ -87,6 +88,10 @@ struct Task {
.count();
m_stats->log_time_data(time, m_type);
}
+
+ if (m_lk) {
+ m_lk->unlock();
+ }
}
};
diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h
index 48c186f..a6d66ab 100644
--- a/include/framework/scheduling/statistics.h
+++ b/include/framework/scheduling/statistics.h
@@ -52,7 +52,7 @@ public:
/* FIXME: This is just a temporary approach */
void log_time_data(size_t length, size_t type) {
- assert(type == 1 || type == 2);
+ assert(type == 1 || type == 2 || type == 3);
if (type == 1) {
m_type_1_cnt.fetch_add(1);
@@ -61,7 +61,7 @@ public:
if (length > m_type_1_largest_time) {
m_type_1_largest_time.store(length);
}
- } else {
+ } else if (type == 2) {
m_type_2_cnt.fetch_add(1);
m_type_2_total_time.fetch_add(length);