From d2279e1b96d352a0af1d425dcaaf93e8a26a8d52 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 30 Oct 2023 17:15:05 -0400 Subject: General Comment + Consistency updates --- include/framework/scheduling/Epoch.h | 1 - include/framework/scheduling/FIFOScheduler.h | 3 +- include/framework/scheduling/Scheduler.h | 195 --------------------------- include/framework/scheduling/Task.h | 6 +- 4 files changed, 6 insertions(+), 199 deletions(-) delete mode 100644 include/framework/scheduling/Scheduler.h (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 87463bd..03cbb62 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -2,7 +2,6 @@ * include/framework/scheduling/Epoch.h * * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie * * All rights reserved. Published under the Modified BSD License. * diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 7ccab26..5425c4f 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -1,8 +1,7 @@ /* - * include/framework/Scheduler.h + * include/framework/scheduling/FIFOScheduler.h * * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie * * All rights reserved. Published under the Modified BSD License. * diff --git a/include/framework/scheduling/Scheduler.h b/include/framework/scheduling/Scheduler.h deleted file mode 100644 index 992cbf9..0000000 --- a/include/framework/scheduling/Scheduler.h +++ /dev/null @@ -1,195 +0,0 @@ -/* - * include/framework/Scheduler.h - * - * Copyright (C) 2023 Douglas Rumbaugh - * Dong Xie - * - * All rights reserved. Published under the Modified BSD License. - * - */ -#pragma once - -#include -#include -#include -#include -#include - -#include "util/types.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" -#include "framework/interface/Record.h" -#include "framework/structure/MutableBuffer.h" -#include "framework/util/Configuration.h" -#include "framework/structure/ExtensionStructure.h" - -namespace de { - -template -class Scheduler { - typedef ExtensionStructure Structure; - typedef MutableBuffer Buffer; -public: - /* - * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means - * unlimited. - */ - Scheduler(size_t memory_budget, size_t thread_cnt) - : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX) - , m_used_memory(0) - , m_used_threads(0) - , m_shutdown(false) - { - m_sched_thrd = std::thread(&Scheduler::run_scheduler, this); - } - - ~Scheduler() { - m_shutdown = true; - - m_cv.notify_all(); - m_sched_thrd.join(); - } - - bool schedule_merge(Structure *version, MutableBuffer *buffer) { - /* - * temporary hack - */ - pending_version = version; - pending_buffer = buffer; - - /* - * Get list of individual level reconstructions that are necessary - * for completing the overall merge - */ - std::vector merges = version->get_merge_tasks(buffer->get_record_count()); - - /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) - */ - for (ssize_t i=0; iget_record_count() * sizeof(R) * 2; - buffer_merge.m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(buffer_merge); - m_merge_queue_lock.unlock(); - - m_cv.notify_all(); - do { - std::unique_lock merge_cv_lock(m_merge_cv_lock); - m_merge_cv.wait(merge_cv_lock); - } while (m_merge_queue.size() > 0); - - assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); - - return true; - } - -private: - size_t get_timestamp() { - auto ts = m_timestamp.fetch_add(1); - return ts; - } - - void schedule_next_task() { - m_merge_queue_lock.lock(); - auto task = m_merge_queue.top(); - m_merge_queue.pop(); - m_merge_queue_lock.unlock(); - - if (task.m_source_level == -1 && task.m_target_level == 0) { - run_buffer_merge(pending_buffer, pending_version); - } else { - run_merge(task, pending_version); - } - - if (m_merge_queue.size() == 0) { - m_merge_cv.notify_all(); - } - } - - - void run_merge(MergeTask task, Structure *version) { - version->merge_levels(task.m_target_level, task.m_source_level); - - if (!version->validate_tombstone_proportion(task.m_target_level)) { - auto tasks = version->get_merge_tasks(task.m_target_level); - /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) - */ - for (ssize_t i=tasks.size()-1; i>=0; i--) { - tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(tasks[i]); - m_merge_queue_lock.unlock(); - } - } - } - - - void run_buffer_merge(Buffer *buffer, Structure *version) { - version->merge_buffer(buffer); - if (!version->validate_tombstone_proportion(0)) { - auto tasks = version->get_merge_tasks_from_level(0); - - /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) - */ - for (ssize_t i=tasks.size()-1; i>=0; i--) { - tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(tasks[i]); - m_merge_queue_lock.unlock(); - } - } - } - - void run_scheduler() { - do { - std::unique_lock cv_lock(m_cv_lock); - m_cv.wait(cv_lock); - - while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { - schedule_next_task(); - } - cv_lock.unlock(); - } while(!m_shutdown); - } - - size_t m_memory_budget; - size_t m_thread_cnt; - - Buffer *pending_buffer; - Structure *pending_version; - - alignas(64) std::atomic m_used_memory; - alignas(64) std::atomic m_used_threads; - alignas(64) std::atomic m_timestamp; - - std::priority_queue, std::greater> m_merge_queue; - std::mutex m_merge_queue_lock; - - std::mutex m_cv_lock; - std::condition_variable m_cv; - - std::mutex m_merge_cv_lock; - std::condition_variable m_merge_cv; - - std::thread m_sched_thrd; - - bool m_shutdown; -}; - -} diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index d25c7c0..228665f 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -1,9 +1,13 @@ /* + * include/framework/scheduling/Task.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * + * All rights reserved. Published under the Modified BSD License. * */ #pragma once -#include #include #include -- cgit v1.2.3