diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-09-25 10:49:36 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-09-25 10:49:36 -0400 |
| commit | 7c03d771475421c1d5a2bbc135242536af1a371c (patch) | |
| tree | 94856ac950662c564608ad3cdc5b59bfd08b955c /include/framework/scheduling | |
| parent | 754372aeccb74815cbb16f32ceacb04b4c5aaba9 (diff) | |
| download | dynamic-extension-7c03d771475421c1d5a2bbc135242536af1a371c.tar.gz | |
Re-structuring Project + scheduling updates
This is a big one--probably should have split it apart, but I'm feeling
lazy this morning.
* Organized the mess of header files in include/framework by splitting
them out into their own subdirectories, and renaming a few files to
remove redundancies introduced by the directory structure.
* Introduced a new framework/ShardRequirements.h header file for simpler
shard development. This header simply contains the necessary includes
from framework/* for creating shard files. This should help to remove
structural dependencies from the framework file structure and shards,
as well as centralizing the necessary framework files to make shard
development easier.
* Created a (currently dummy) SchedulerInterface, and make the scheduler
implementation a template parameter of the dynamic extension for easier
testing of various scheduling policies. There's still more work to be
done to fully integrate the scheduler (queries, multiple buffers), but
some more of the necessary framework code for this has been added as well.
* Adjusted the Task interface setup for the scheduler. The task structures
have been removed from ExtensionStructure and placed in their own header
file. Additionally, I started experimenting with using std::variant,
as opposed to inheritence, to implement subtype polymorphism on the
Merge and Query tasks. The scheduler now has a general task queue that
contains both, and std::variant, std::visit, and std::get are used to
manipulate them without virtual functions.
* Removed Alex.h, as it can't build anyway. There's a branch out there
containing the Alex implementation stripped of the C++20 stuff. So
there's no need to keep it here.
Diffstat (limited to 'include/framework/scheduling')
| -rw-r--r-- | include/framework/scheduling/Scheduler.h | 195 | ||||
| -rw-r--r-- | include/framework/scheduling/SerialScheduler.h | 227 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 63 |
3 files changed, 485 insertions, 0 deletions
diff --git a/include/framework/scheduling/Scheduler.h b/include/framework/scheduling/Scheduler.h new file mode 100644 index 0000000..992cbf9 --- /dev/null +++ b/include/framework/scheduling/Scheduler.h @@ -0,0 +1,195 @@ +/* + * include/framework/Scheduler.h + * + * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include <vector> +#include <memory> +#include <queue> +#include <thread> +#include <condition_variable> + +#include "util/types.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" +#include "framework/interface/Record.h" +#include "framework/structure/MutableBuffer.h" +#include "framework/util/Configuration.h" +#include "framework/structure/ExtensionStructure.h" + +namespace de { + +template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L> +class Scheduler { + typedef ExtensionStructure<R, S, Q, L> Structure; + typedef MutableBuffer<R> Buffer; +public: + /* + * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means + * unlimited. + */ + Scheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX) + , m_used_memory(0) + , m_used_threads(0) + , m_shutdown(false) + { + m_sched_thrd = std::thread(&Scheduler::run_scheduler, this); + } + + ~Scheduler() { + m_shutdown = true; + + m_cv.notify_all(); + m_sched_thrd.join(); + } + + bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) { + /* + * temporary hack + */ + pending_version = version; + pending_buffer = buffer; + + /* + * Get list of individual level reconstructions that are necessary + * for completing the overall merge + */ + std::vector<MergeTask> merges = version->get_merge_tasks(buffer->get_record_count()); + + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=0; i<merges.size(); i++) { + merges[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(merges[i]); + m_merge_queue_lock.unlock(); + } + + MergeTask buffer_merge; + buffer_merge.m_source_level = -1; + buffer_merge.m_target_level = 0; + buffer_merge.m_size = buffer->get_record_count() * sizeof(R) * 2; + buffer_merge.m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(buffer_merge); + m_merge_queue_lock.unlock(); + + m_cv.notify_all(); + do { + std::unique_lock<std::mutex> merge_cv_lock(m_merge_cv_lock); + m_merge_cv.wait(merge_cv_lock); + } while (m_merge_queue.size() > 0); + + assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); + + return true; + } + +private: + size_t get_timestamp() { + auto ts = m_timestamp.fetch_add(1); + return ts; + } + + void schedule_next_task() { + m_merge_queue_lock.lock(); + auto task = m_merge_queue.top(); + m_merge_queue.pop(); + m_merge_queue_lock.unlock(); + + if (task.m_source_level == -1 && task.m_target_level == 0) { + run_buffer_merge(pending_buffer, pending_version); + } else { + run_merge(task, pending_version); + } + + if (m_merge_queue.size() == 0) { + m_merge_cv.notify_all(); + } + } + + + void run_merge(MergeTask task, Structure *version) { + version->merge_levels(task.m_target_level, task.m_source_level); + + if (!version->validate_tombstone_proportion(task.m_target_level)) { + auto tasks = version->get_merge_tasks(task.m_target_level); + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=tasks.size()-1; i>=0; i--) { + tasks[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(tasks[i]); + m_merge_queue_lock.unlock(); + } + } + } + + + void run_buffer_merge(Buffer *buffer, Structure *version) { + version->merge_buffer(buffer); + if (!version->validate_tombstone_proportion(0)) { + auto tasks = version->get_merge_tasks_from_level(0); + + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=tasks.size()-1; i>=0; i--) { + tasks[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(tasks[i]); + m_merge_queue_lock.unlock(); + } + } + } + + void run_scheduler() { + do { + std::unique_lock<std::mutex> cv_lock(m_cv_lock); + m_cv.wait(cv_lock); + + while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { + schedule_next_task(); + } + cv_lock.unlock(); + } while(!m_shutdown); + } + + size_t m_memory_budget; + size_t m_thread_cnt; + + Buffer *pending_buffer; + Structure *pending_version; + + alignas(64) std::atomic<size_t> m_used_memory; + alignas(64) std::atomic<size_t> m_used_threads; + alignas(64) std::atomic<size_t> m_timestamp; + + std::priority_queue<MergeTask, std::vector<MergeTask>, std::greater<MergeTask>> m_merge_queue; + std::mutex m_merge_queue_lock; + + std::mutex m_cv_lock; + std::condition_variable m_cv; + + std::mutex m_merge_cv_lock; + std::condition_variable m_merge_cv; + + std::thread m_sched_thrd; + + bool m_shutdown; +}; + +} diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h new file mode 100644 index 0000000..5e16bdf --- /dev/null +++ b/include/framework/scheduling/SerialScheduler.h @@ -0,0 +1,227 @@ +/* + * include/framework/Scheduler.h + * + * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include <vector> +#include <memory> +#include <queue> +#include <thread> +#include <condition_variable> + +#include "util/types.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" +#include "framework/interface/Record.h" +#include "framework/structure/MutableBuffer.h" +#include "framework/util/Configuration.h" +#include "framework/structure/ExtensionStructure.h" +#include "framework/scheduling/Task.h" + +namespace de { + +template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L> +class SerialScheduler { + typedef ExtensionStructure<R, S, Q, L> Structure; + typedef MutableBuffer<R> Buffer; +public: + /* + * A simple "scheduler" that runs tasks serially, in a FIFO manner. Incoming concurrent + * requests will wait for their turn, and only one task will be active in the system at + * a time. The scheduler will spin up a second thread for running itself, but all tasks + * will be single-threaded. + * + * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means + * unlimited. + * + * Note that the SerialScheduler object is non-concurrent, and so will ignore the + * thread_cnt argument. It will obey the memory_budget, however a failure due to + * memory constraints will be irrecoverable, as there is no way to free up memory + * or block particular tasks until memory becomes available. + */ + SerialScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX) + , m_used_memory(0) + , m_used_threads(0) + , m_shutdown(false) + { + m_sched_thrd = std::thread(&SerialScheduler::run_scheduler, this); + } + + ~SerialScheduler() { + m_shutdown = true; + + m_cv.notify_all(); + m_sched_thrd.join(); + } + + bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) { + pending_version = version; + pending_buffer = buffer; + + /* + * Get list of individual level reconstructions that are necessary + * for completing the overall merge + */ + std::vector<MergeTask> merges = version->get_merge_tasks(buffer->get_record_count()); + + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=0; i<merges.size(); i++) { + merges[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.emplace(merges[i]); + m_merge_queue_lock.unlock(); + } + + MergeTask buffer_merge; + buffer_merge.m_source_level = -1; + buffer_merge.m_target_level = 0; + buffer_merge.m_size = buffer->get_record_count() * sizeof(R) * 2; + buffer_merge.m_timestamp = m_timestamp.fetch_add(1); + buffer_merge.m_type = TaskType::MERGE; + m_merge_queue_lock.lock(); + m_merge_queue.emplace(buffer_merge); + m_merge_queue_lock.unlock(); + + m_cv.notify_all(); + do { + std::unique_lock<std::mutex> merge_cv_lock(m_merge_cv_lock); + m_merge_cv.wait(merge_cv_lock); + } while (m_merge_queue.size() > 0); + + assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); + + return true; + } + + bool schedule_query() { + return true; + } + +private: + size_t get_timestamp() { + auto ts = m_timestamp.fetch_add(1); + return ts; + } + + void schedule_merge(MergeTask task) { + if (task.m_source_level == -1 && task.m_target_level == 0) { + run_buffer_merge(pending_buffer, pending_version); + } else { + run_merge(task, pending_version); + } + } + + + void schedule_query(QueryTask task) { + + } + + void schedule_next_task() { + m_merge_queue_lock.lock(); + auto task = m_merge_queue.top(); + m_merge_queue.pop(); + m_merge_queue_lock.unlock(); + + auto type = std::visit(GetTaskType{}, task); + + switch (type) { + case TaskType::MERGE: + schedule_merge(std::get<MergeTask>(task)); + break; + case TaskType::QUERY: + schedule_query(std::get<QueryTask>(task)); + break; + default: assert(false); + } + + if (m_merge_queue.size() == 0) { + m_merge_cv.notify_all(); + } + } + + + void run_merge(MergeTask task, Structure *version) { + version->merge_levels(task.m_target_level, task.m_source_level); + + if (!version->validate_tombstone_proportion(task.m_target_level)) { + auto tasks = version->get_merge_tasks(task.m_target_level); + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=tasks.size()-1; i>=0; i--) { + tasks[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(tasks[i]); + m_merge_queue_lock.unlock(); + } + } + } + + + void run_buffer_merge(Buffer *buffer, Structure *version) { + version->merge_buffer(buffer); + if (!version->validate_tombstone_proportion(0)) { + auto tasks = version->get_merge_tasks_from_level(0); + + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=tasks.size()-1; i>=0; i--) { + tasks[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(tasks[i]); + m_merge_queue_lock.unlock(); + } + } + } + + void run_scheduler() { + do { + std::unique_lock<std::mutex> cv_lock(m_cv_lock); + m_cv.wait(cv_lock); + + while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { + schedule_next_task(); + } + cv_lock.unlock(); + } while(!m_shutdown); + } + + size_t m_memory_budget; + size_t m_thread_cnt; + + Buffer *pending_buffer; + Structure *pending_version; + + alignas(64) std::atomic<size_t> m_used_memory; + alignas(64) std::atomic<size_t> m_used_threads; + alignas(64) std::atomic<size_t> m_timestamp; + + std::priority_queue<Task, std::vector<Task>, std::greater<Task>> m_merge_queue; + std::mutex m_merge_queue_lock; + + std::mutex m_cv_lock; + std::condition_variable m_cv; + + std::mutex m_merge_cv_lock; + std::condition_variable m_merge_cv; + + std::thread m_sched_thrd; + + bool m_shutdown; +}; + +} diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h new file mode 100644 index 0000000..9e0655a --- /dev/null +++ b/include/framework/scheduling/Task.h @@ -0,0 +1,63 @@ +/* + * + */ +#pragma once + +#include <variant> + +#include "framework/util/Configuration.h" + +namespace de { + +enum class TaskType { + MERGE, + QUERY +}; + +struct MergeTask { + level_index m_source_level; + level_index m_target_level; + size_t m_timestamp; + size_t m_size; + TaskType m_type; + + TaskType get_type() const { + return m_type; + } + + friend bool operator<(const MergeTask &self, const MergeTask &other) { + return self.m_timestamp < other.m_timestamp; + } + + friend bool operator>(const MergeTask &self, const MergeTask &other) { + return self.m_timestamp > other.m_timestamp; + } + +}; + +struct QueryTask { + size_t m_timestamp; + size_t m_size; + TaskType m_type; + + TaskType get_type() const { + return m_type; + } + + friend bool operator<(const QueryTask &self, const QueryTask &other) { + return self.m_timestamp < other.m_timestamp; + } + + friend bool operator>(const QueryTask &self, const QueryTask &other) { + return self.m_timestamp > other.m_timestamp; + } +}; + +struct GetTaskType { + TaskType operator()(const MergeTask &t) { return t.get_type(); } + TaskType operator()(const QueryTask &t) { return t.get_type(); } +}; + +typedef std::variant<MergeTask, QueryTask> Task; + +} |