diff options
Diffstat (limited to 'include/framework/scheduling')
| -rw-r--r-- | include/framework/scheduling/Epoch.h | 45 | ||||
| -rw-r--r-- | include/framework/scheduling/FIFOScheduler.h | 13 | ||||
| -rw-r--r-- | include/framework/scheduling/SerialScheduler.h | 2 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 2 |
4 files changed, 56 insertions, 6 deletions
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index f4aefe9..0ebbde9 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -25,6 +25,7 @@ public: Epoch(size_t number=0) : m_buffers() , m_structure(nullptr) + , m_active_merge(false) , m_active_jobs(0) , m_active(true) , m_epoch_number(number) @@ -34,6 +35,7 @@ public: : m_buffers() , m_structure(structure) , m_active_jobs(0) + , m_active_merge(false) , m_active(true) , m_epoch_number(number) { @@ -54,11 +56,25 @@ public: } } - void add_buffer(Buffer *buf) { + Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) { assert(buf); + /* + * if a current buffer is specified, only add the + * new buffer if the active buffer is the current, + * otherwise just return the active buffer (poor man's + * CAS). + */ + if (cur_buf) { + auto active_buf = get_active_buffer(); + if (active_buf != cur_buf) { + return active_buf; + } + } + buf->take_reference(); m_buffers.push_back(buf); + return buf; } void start_job() { @@ -137,6 +153,31 @@ public: return epoch; } + /* + * Check if a merge can be started from this Epoch. + * At present, without concurrent merging, this simply + * checks if there is currently a scheduled merge based + * on this Epoch. If there is, returns false. If there + * isn't, return true and set a flag indicating that + * there is an active merge. + */ + bool prepare_merge() { + auto old = m_active_merge.load(); + if (old) { + return false; + } + + // FIXME: this needs cleaned up + while (!m_active_merge.compare_exchange_strong(old, true)) { + old = m_active_merge.load(); + if (old) { + return false; + } + } + + return true; + } + void set_inactive() { m_active = false; } @@ -170,6 +211,8 @@ private: std::condition_variable m_active_cv; std::mutex m_cv_lock; + std::atomic<bool> m_active_merge; + /* * The number of currently active jobs * (queries/merges) operating on this diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 91a72b3..1521eb6 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -24,20 +24,26 @@ #include "framework/structure/ExtensionStructure.h" #include "framework/scheduling/Task.h" +#include "ctpl/ctpl.h" #include "psu-ds/LockedPriorityQueue.h" namespace de { + class FIFOScheduler { +private: + static const size_t DEFAULT_MAX_THREADS = 8; + public: FIFOScheduler(size_t memory_budget, size_t thread_cnt) : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) + , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS) , m_used_memory(0) , m_used_thrds(0) , m_shutdown(false) { m_sched_thrd = std::thread(&FIFOScheduler::run, this); + m_thrd_pool.resize(m_thrd_cnt); } ~FIFOScheduler() { @@ -72,6 +78,7 @@ private: std::condition_variable m_cv; std::thread m_sched_thrd; + ctpl::thread_pool m_thrd_pool; std::atomic<size_t> m_used_thrds; std::atomic<size_t> m_used_memory; @@ -79,7 +86,7 @@ private: void schedule_next() { assert(m_task_queue.size() > 0); auto t = m_task_queue.pop(); - t(); + m_thrd_pool.push(t); } void run() { @@ -87,7 +94,7 @@ private: std::unique_lock<std::mutex> cv_lock(m_cv_lock); m_cv.wait(cv_lock); - while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) { + while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) { schedule_next(); } } while(!m_shutdown.load()); diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index 9c767e8..93611d1 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -47,7 +47,7 @@ public: void schedule_job(std::function<void(void*)> job, size_t size, void *args) { size_t ts = m_counter++; auto t = Task(size, ts, job, args); - t(); + t(0); } void shutdown() { diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 228665f..6dfd7df 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -54,7 +54,7 @@ struct Task { return self.m_timestamp > other.m_timestamp; } - void operator()() { + void operator()(size_t thrd_id) { m_job(m_args); } }; |