summaryrefslogtreecommitdiffstats
path: root/include/framework/Scheduler.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-09-18 16:37:30 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-09-18 16:37:30 -0400
commit6e30f576ca9d11d1901f4877315e97f84d15b1e1 (patch)
treeaec980d3e8b0c655a1c38c7c06b314cf10db4f94 /include/framework/Scheduler.h
parentabc8605a51537fc7b35bb0d9b1da6c724c5c6973 (diff)
downloaddynamic-extension-6e30f576ca9d11d1901f4877315e97f84d15b1e1.tar.gz
The scheduler now spawns a seperate merge thread
Merges are now executed from a seperate thread within the scheduler that wakes up via condition variables when new merge tasks are scheduled. In addition, tombstone limits are now enforced by the scheduler, with new merges being scheduled as needed. There are still a few tests failing, notably the zero tombstones in the last run invarient is not holding under tiering with tombstones. Need to look into that yet.
Diffstat (limited to 'include/framework/Scheduler.h')
-rw-r--r--include/framework/Scheduler.h126
1 files changed, 122 insertions, 4 deletions
diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h
index 28ed8a9..534ce25 100644
--- a/include/framework/Scheduler.h
+++ b/include/framework/Scheduler.h
@@ -12,6 +12,8 @@
#include <vector>
#include <memory>
#include <queue>
+#include <thread>
+#include <condition_variable>
#include "util/types.h"
#include "framework/ShardInterface.h"
@@ -26,6 +28,7 @@ namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
class Scheduler {
typedef ExtensionStructure<R, S, Q, L> Structure;
+ typedef MutableBuffer<R> Buffer;
public:
/*
* Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means
@@ -36,10 +39,26 @@ public:
, m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX)
, m_used_memory(0)
, m_used_threads(0)
- {}
+ , m_shutdown(false)
+ {
+ m_sched_thrd = std::thread(&Scheduler::run_scheduler, this);
+ }
+
+ ~Scheduler() {
+ m_shutdown = true;
+
+ m_cv.notify_all();
+ m_sched_thrd.join();
+ }
bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) {
/*
+ * temporary hack
+ */
+ pending_version = version;
+ pending_buffer = buffer;
+
+ /*
* Get list of individual level reconstructions that are necessary
* for completing the overall merge
*/
@@ -50,10 +69,30 @@ public:
* executes them sequentially in a blocking fashion)
*/
for (ssize_t i=merges.size()-1; i>=0; i--) {
- version->merge_levels(merges[i].m_target_level, merges[i].m_source_level);
+ merges[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.push(merges[i]);
+ m_merge_queue_lock.unlock();
}
- return version->merge_buffer(buffer);
+ MergeTask buffer_merge;
+ buffer_merge.m_source_level = -1;
+ buffer_merge.m_target_level = 0;
+ buffer_merge.m_size = buffer->get_record_count() * sizeof(R) * 2;
+ buffer_merge.m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.push(buffer_merge);
+ m_merge_queue_lock.unlock();
+
+ m_cv.notify_all();
+ do {
+ std::unique_lock<std::mutex> merge_cv_lock(m_merge_cv_lock);
+ m_merge_cv.wait(merge_cv_lock);
+ } while (m_merge_queue.size() > 0);
+
+ assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0);
+
+ return true;
}
private:
@@ -62,15 +101,94 @@ private:
return ts;
}
+ void schedule_next_task() {
+ m_merge_queue_lock.lock();
+ auto task = m_merge_queue.top();
+ m_merge_queue.pop();
+ m_merge_queue_lock.unlock();
+
+ if (task.m_source_level == -1 && task.m_target_level == 0) {
+ run_buffer_merge(pending_buffer, pending_version);
+ } else {
+ run_merge(task, pending_version);
+ }
+
+ if (m_merge_queue.size() == 0) {
+ m_merge_cv.notify_all();
+ }
+ }
+
+ void run_merge(MergeTask task, Structure *version) {
+ version->merge_levels(task.m_target_level, task.m_source_level);
+ if (!version->validate_tombstone_proportion(task.m_target_level)) {
+ auto tasks = version->get_merge_tasks(task.m_target_level);
+ /*
+ * Schedule the merge tasks (FIXME: currently this just
+ * executes them sequentially in a blocking fashion)
+ */
+ for (ssize_t i=tasks.size()-1; i>=0; i--) {
+ tasks[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.push(tasks[i]);
+ m_merge_queue_lock.unlock();
+ }
+ }
+ }
+
+
+ void run_buffer_merge(Buffer *buffer, Structure *version) {
+ version->merge_buffer(buffer);
+ if (!version->validate_tombstone_proportion(0)) {
+ auto tasks = version->get_merge_tasks_from_level(0);
+
+ /*
+ * Schedule the merge tasks (FIXME: currently this just
+ * executes them sequentially in a blocking fashion)
+ */
+ for (ssize_t i=tasks.size()-1; i>=0; i--) {
+ tasks[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.push(tasks[i]);
+ m_merge_queue_lock.unlock();
+ }
+ }
+ }
+
+ void run_scheduler() {
+ do {
+ std::unique_lock<std::mutex> cv_lock(m_cv_lock);
+ m_cv.wait(cv_lock);
+
+ while (m_merge_queue.size() > 0 && m_used_threads < m_thread_cnt) {
+ schedule_next_task();
+ }
+ cv_lock.unlock();
+ } while(!m_shutdown);
+ }
+
size_t m_memory_budget;
size_t m_thread_cnt;
+ Buffer *pending_buffer;
+ Structure *pending_version;
+
alignas(64) std::atomic<size_t> m_used_memory;
alignas(64) std::atomic<size_t> m_used_threads;
alignas(64) std::atomic<size_t> m_timestamp;
- std::priority_queue<MergeTask> m_merge_queue;
+ std::priority_queue<MergeTask, std::vector<MergeTask>, std::greater<MergeTask>> m_merge_queue;
std::mutex m_merge_queue_lock;
+
+ std::mutex m_cv_lock;
+ std::condition_variable m_cv;
+
+ std::mutex m_merge_cv_lock;
+ std::condition_variable m_merge_cv;
+
+ std::thread m_sched_thrd;
+
+ bool m_shutdown;
+
};
}