From 6e30f576ca9d11d1901f4877315e97f84d15b1e1 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 18 Sep 2023 16:37:30 -0400 Subject: The scheduler now spawns a seperate merge thread Merges are now executed from a seperate thread within the scheduler that wakes up via condition variables when new merge tasks are scheduled. In addition, tombstone limits are now enforced by the scheduler, with new merges being scheduled as needed. There are still a few tests failing, notably the zero tombstones in the last run invarient is not holding under tiering with tombstones. Need to look into that yet. --- include/framework/Scheduler.h | 126 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 122 insertions(+), 4 deletions(-) (limited to 'include/framework/Scheduler.h') diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h index 28ed8a9..534ce25 100644 --- a/include/framework/Scheduler.h +++ b/include/framework/Scheduler.h @@ -12,6 +12,8 @@ #include #include #include +#include +#include #include "util/types.h" #include "framework/ShardInterface.h" @@ -26,6 +28,7 @@ namespace de { template class Scheduler { typedef ExtensionStructure Structure; + typedef MutableBuffer Buffer; public: /* * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means @@ -36,9 +39,25 @@ public: , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX) , m_used_memory(0) , m_used_threads(0) - {} + , m_shutdown(false) + { + m_sched_thrd = std::thread(&Scheduler::run_scheduler, this); + } + + ~Scheduler() { + m_shutdown = true; + + m_cv.notify_all(); + m_sched_thrd.join(); + } bool schedule_merge(Structure *version, MutableBuffer *buffer) { + /* + * temporary hack + */ + pending_version = version; + pending_buffer = buffer; + /* * Get list of individual level reconstructions that are necessary * for completing the overall merge @@ -50,10 +69,30 @@ public: * executes them sequentially in a blocking fashion) */ for (ssize_t i=merges.size()-1; i>=0; i--) { - version->merge_levels(merges[i].m_target_level, merges[i].m_source_level); + merges[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(merges[i]); + m_merge_queue_lock.unlock(); } - return version->merge_buffer(buffer); + MergeTask buffer_merge; + buffer_merge.m_source_level = -1; + buffer_merge.m_target_level = 0; + buffer_merge.m_size = buffer->get_record_count() * sizeof(R) * 2; + buffer_merge.m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(buffer_merge); + m_merge_queue_lock.unlock(); + + m_cv.notify_all(); + do { + std::unique_lock merge_cv_lock(m_merge_cv_lock); + m_merge_cv.wait(merge_cv_lock); + } while (m_merge_queue.size() > 0); + + assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); + + return true; } private: @@ -62,15 +101,94 @@ private: return ts; } + void schedule_next_task() { + m_merge_queue_lock.lock(); + auto task = m_merge_queue.top(); + m_merge_queue.pop(); + m_merge_queue_lock.unlock(); + + if (task.m_source_level == -1 && task.m_target_level == 0) { + run_buffer_merge(pending_buffer, pending_version); + } else { + run_merge(task, pending_version); + } + + if (m_merge_queue.size() == 0) { + m_merge_cv.notify_all(); + } + } + + void run_merge(MergeTask task, Structure *version) { + version->merge_levels(task.m_target_level, task.m_source_level); + if (!version->validate_tombstone_proportion(task.m_target_level)) { + auto tasks = version->get_merge_tasks(task.m_target_level); + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=tasks.size()-1; i>=0; i--) { + tasks[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(tasks[i]); + m_merge_queue_lock.unlock(); + } + } + } + + + void run_buffer_merge(Buffer *buffer, Structure *version) { + version->merge_buffer(buffer); + if (!version->validate_tombstone_proportion(0)) { + auto tasks = version->get_merge_tasks_from_level(0); + + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=tasks.size()-1; i>=0; i--) { + tasks[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(tasks[i]); + m_merge_queue_lock.unlock(); + } + } + } + + void run_scheduler() { + do { + std::unique_lock cv_lock(m_cv_lock); + m_cv.wait(cv_lock); + + while (m_merge_queue.size() > 0 && m_used_threads < m_thread_cnt) { + schedule_next_task(); + } + cv_lock.unlock(); + } while(!m_shutdown); + } + size_t m_memory_budget; size_t m_thread_cnt; + Buffer *pending_buffer; + Structure *pending_version; + alignas(64) std::atomic m_used_memory; alignas(64) std::atomic m_used_threads; alignas(64) std::atomic m_timestamp; - std::priority_queue m_merge_queue; + std::priority_queue, std::greater> m_merge_queue; std::mutex m_merge_queue_lock; + + std::mutex m_cv_lock; + std::condition_variable m_cv; + + std::mutex m_merge_cv_lock; + std::condition_variable m_merge_cv; + + std::thread m_sched_thrd; + + bool m_shutdown; + }; } -- cgit v1.2.3