From 7c03d771475421c1d5a2bbc135242536af1a371c Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 25 Sep 2023 10:49:36 -0400 Subject: Re-structuring Project + scheduling updates This is a big one--probably should have split it apart, but I'm feeling lazy this morning. * Organized the mess of header files in include/framework by splitting them out into their own subdirectories, and renaming a few files to remove redundancies introduced by the directory structure. * Introduced a new framework/ShardRequirements.h header file for simpler shard development. This header simply contains the necessary includes from framework/* for creating shard files. This should help to remove structural dependencies from the framework file structure and shards, as well as centralizing the necessary framework files to make shard development easier. * Created a (currently dummy) SchedulerInterface, and make the scheduler implementation a template parameter of the dynamic extension for easier testing of various scheduling policies. There's still more work to be done to fully integrate the scheduler (queries, multiple buffers), but some more of the necessary framework code for this has been added as well. * Adjusted the Task interface setup for the scheduler. The task structures have been removed from ExtensionStructure and placed in their own header file. Additionally, I started experimenting with using std::variant, as opposed to inheritence, to implement subtype polymorphism on the Merge and Query tasks. The scheduler now has a general task queue that contains both, and std::variant, std::visit, and std::get are used to manipulate them without virtual functions. * Removed Alex.h, as it can't build anyway. There's a branch out there containing the Alex implementation stripped of the C++20 stuff. So there's no need to keep it here. --- include/framework/scheduling/Scheduler.h | 195 +++++++++++++++++++++ include/framework/scheduling/SerialScheduler.h | 227 +++++++++++++++++++++++++ include/framework/scheduling/Task.h | 63 +++++++ 3 files changed, 485 insertions(+) create mode 100644 include/framework/scheduling/Scheduler.h create mode 100644 include/framework/scheduling/SerialScheduler.h create mode 100644 include/framework/scheduling/Task.h (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Scheduler.h b/include/framework/scheduling/Scheduler.h new file mode 100644 index 0000000..992cbf9 --- /dev/null +++ b/include/framework/scheduling/Scheduler.h @@ -0,0 +1,195 @@ +/* + * include/framework/Scheduler.h + * + * Copyright (C) 2023 Douglas Rumbaugh + * Dong Xie + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include +#include +#include +#include +#include + +#include "util/types.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" +#include "framework/interface/Record.h" +#include "framework/structure/MutableBuffer.h" +#include "framework/util/Configuration.h" +#include "framework/structure/ExtensionStructure.h" + +namespace de { + +template +class Scheduler { + typedef ExtensionStructure Structure; + typedef MutableBuffer Buffer; +public: + /* + * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means + * unlimited. + */ + Scheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX) + , m_used_memory(0) + , m_used_threads(0) + , m_shutdown(false) + { + m_sched_thrd = std::thread(&Scheduler::run_scheduler, this); + } + + ~Scheduler() { + m_shutdown = true; + + m_cv.notify_all(); + m_sched_thrd.join(); + } + + bool schedule_merge(Structure *version, MutableBuffer *buffer) { + /* + * temporary hack + */ + pending_version = version; + pending_buffer = buffer; + + /* + * Get list of individual level reconstructions that are necessary + * for completing the overall merge + */ + std::vector merges = version->get_merge_tasks(buffer->get_record_count()); + + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=0; iget_record_count() * sizeof(R) * 2; + buffer_merge.m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(buffer_merge); + m_merge_queue_lock.unlock(); + + m_cv.notify_all(); + do { + std::unique_lock merge_cv_lock(m_merge_cv_lock); + m_merge_cv.wait(merge_cv_lock); + } while (m_merge_queue.size() > 0); + + assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); + + return true; + } + +private: + size_t get_timestamp() { + auto ts = m_timestamp.fetch_add(1); + return ts; + } + + void schedule_next_task() { + m_merge_queue_lock.lock(); + auto task = m_merge_queue.top(); + m_merge_queue.pop(); + m_merge_queue_lock.unlock(); + + if (task.m_source_level == -1 && task.m_target_level == 0) { + run_buffer_merge(pending_buffer, pending_version); + } else { + run_merge(task, pending_version); + } + + if (m_merge_queue.size() == 0) { + m_merge_cv.notify_all(); + } + } + + + void run_merge(MergeTask task, Structure *version) { + version->merge_levels(task.m_target_level, task.m_source_level); + + if (!version->validate_tombstone_proportion(task.m_target_level)) { + auto tasks = version->get_merge_tasks(task.m_target_level); + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=tasks.size()-1; i>=0; i--) { + tasks[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(tasks[i]); + m_merge_queue_lock.unlock(); + } + } + } + + + void run_buffer_merge(Buffer *buffer, Structure *version) { + version->merge_buffer(buffer); + if (!version->validate_tombstone_proportion(0)) { + auto tasks = version->get_merge_tasks_from_level(0); + + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=tasks.size()-1; i>=0; i--) { + tasks[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(tasks[i]); + m_merge_queue_lock.unlock(); + } + } + } + + void run_scheduler() { + do { + std::unique_lock cv_lock(m_cv_lock); + m_cv.wait(cv_lock); + + while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { + schedule_next_task(); + } + cv_lock.unlock(); + } while(!m_shutdown); + } + + size_t m_memory_budget; + size_t m_thread_cnt; + + Buffer *pending_buffer; + Structure *pending_version; + + alignas(64) std::atomic m_used_memory; + alignas(64) std::atomic m_used_threads; + alignas(64) std::atomic m_timestamp; + + std::priority_queue, std::greater> m_merge_queue; + std::mutex m_merge_queue_lock; + + std::mutex m_cv_lock; + std::condition_variable m_cv; + + std::mutex m_merge_cv_lock; + std::condition_variable m_merge_cv; + + std::thread m_sched_thrd; + + bool m_shutdown; +}; + +} diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h new file mode 100644 index 0000000..5e16bdf --- /dev/null +++ b/include/framework/scheduling/SerialScheduler.h @@ -0,0 +1,227 @@ +/* + * include/framework/Scheduler.h + * + * Copyright (C) 2023 Douglas Rumbaugh + * Dong Xie + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include +#include +#include +#include +#include + +#include "util/types.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" +#include "framework/interface/Record.h" +#include "framework/structure/MutableBuffer.h" +#include "framework/util/Configuration.h" +#include "framework/structure/ExtensionStructure.h" +#include "framework/scheduling/Task.h" + +namespace de { + +template +class SerialScheduler { + typedef ExtensionStructure Structure; + typedef MutableBuffer Buffer; +public: + /* + * A simple "scheduler" that runs tasks serially, in a FIFO manner. Incoming concurrent + * requests will wait for their turn, and only one task will be active in the system at + * a time. The scheduler will spin up a second thread for running itself, but all tasks + * will be single-threaded. + * + * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means + * unlimited. + * + * Note that the SerialScheduler object is non-concurrent, and so will ignore the + * thread_cnt argument. It will obey the memory_budget, however a failure due to + * memory constraints will be irrecoverable, as there is no way to free up memory + * or block particular tasks until memory becomes available. + */ + SerialScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX) + , m_used_memory(0) + , m_used_threads(0) + , m_shutdown(false) + { + m_sched_thrd = std::thread(&SerialScheduler::run_scheduler, this); + } + + ~SerialScheduler() { + m_shutdown = true; + + m_cv.notify_all(); + m_sched_thrd.join(); + } + + bool schedule_merge(Structure *version, MutableBuffer *buffer) { + pending_version = version; + pending_buffer = buffer; + + /* + * Get list of individual level reconstructions that are necessary + * for completing the overall merge + */ + std::vector merges = version->get_merge_tasks(buffer->get_record_count()); + + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=0; iget_record_count() * sizeof(R) * 2; + buffer_merge.m_timestamp = m_timestamp.fetch_add(1); + buffer_merge.m_type = TaskType::MERGE; + m_merge_queue_lock.lock(); + m_merge_queue.emplace(buffer_merge); + m_merge_queue_lock.unlock(); + + m_cv.notify_all(); + do { + std::unique_lock merge_cv_lock(m_merge_cv_lock); + m_merge_cv.wait(merge_cv_lock); + } while (m_merge_queue.size() > 0); + + assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); + + return true; + } + + bool schedule_query() { + return true; + } + +private: + size_t get_timestamp() { + auto ts = m_timestamp.fetch_add(1); + return ts; + } + + void schedule_merge(MergeTask task) { + if (task.m_source_level == -1 && task.m_target_level == 0) { + run_buffer_merge(pending_buffer, pending_version); + } else { + run_merge(task, pending_version); + } + } + + + void schedule_query(QueryTask task) { + + } + + void schedule_next_task() { + m_merge_queue_lock.lock(); + auto task = m_merge_queue.top(); + m_merge_queue.pop(); + m_merge_queue_lock.unlock(); + + auto type = std::visit(GetTaskType{}, task); + + switch (type) { + case TaskType::MERGE: + schedule_merge(std::get(task)); + break; + case TaskType::QUERY: + schedule_query(std::get(task)); + break; + default: assert(false); + } + + if (m_merge_queue.size() == 0) { + m_merge_cv.notify_all(); + } + } + + + void run_merge(MergeTask task, Structure *version) { + version->merge_levels(task.m_target_level, task.m_source_level); + + if (!version->validate_tombstone_proportion(task.m_target_level)) { + auto tasks = version->get_merge_tasks(task.m_target_level); + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=tasks.size()-1; i>=0; i--) { + tasks[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(tasks[i]); + m_merge_queue_lock.unlock(); + } + } + } + + + void run_buffer_merge(Buffer *buffer, Structure *version) { + version->merge_buffer(buffer); + if (!version->validate_tombstone_proportion(0)) { + auto tasks = version->get_merge_tasks_from_level(0); + + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=tasks.size()-1; i>=0; i--) { + tasks[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(tasks[i]); + m_merge_queue_lock.unlock(); + } + } + } + + void run_scheduler() { + do { + std::unique_lock cv_lock(m_cv_lock); + m_cv.wait(cv_lock); + + while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { + schedule_next_task(); + } + cv_lock.unlock(); + } while(!m_shutdown); + } + + size_t m_memory_budget; + size_t m_thread_cnt; + + Buffer *pending_buffer; + Structure *pending_version; + + alignas(64) std::atomic m_used_memory; + alignas(64) std::atomic m_used_threads; + alignas(64) std::atomic m_timestamp; + + std::priority_queue, std::greater> m_merge_queue; + std::mutex m_merge_queue_lock; + + std::mutex m_cv_lock; + std::condition_variable m_cv; + + std::mutex m_merge_cv_lock; + std::condition_variable m_merge_cv; + + std::thread m_sched_thrd; + + bool m_shutdown; +}; + +} diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h new file mode 100644 index 0000000..9e0655a --- /dev/null +++ b/include/framework/scheduling/Task.h @@ -0,0 +1,63 @@ +/* + * + */ +#pragma once + +#include + +#include "framework/util/Configuration.h" + +namespace de { + +enum class TaskType { + MERGE, + QUERY +}; + +struct MergeTask { + level_index m_source_level; + level_index m_target_level; + size_t m_timestamp; + size_t m_size; + TaskType m_type; + + TaskType get_type() const { + return m_type; + } + + friend bool operator<(const MergeTask &self, const MergeTask &other) { + return self.m_timestamp < other.m_timestamp; + } + + friend bool operator>(const MergeTask &self, const MergeTask &other) { + return self.m_timestamp > other.m_timestamp; + } + +}; + +struct QueryTask { + size_t m_timestamp; + size_t m_size; + TaskType m_type; + + TaskType get_type() const { + return m_type; + } + + friend bool operator<(const QueryTask &self, const QueryTask &other) { + return self.m_timestamp < other.m_timestamp; + } + + friend bool operator>(const QueryTask &self, const QueryTask &other) { + return self.m_timestamp > other.m_timestamp; + } +}; + +struct GetTaskType { + TaskType operator()(const MergeTask &t) { return t.get_type(); } + TaskType operator()(const QueryTask &t) { return t.get_type(); } +}; + +typedef std::variant Task; + +} -- cgit v1.2.3 From 1a47cbd7978dcad7ed0b2f2af3f933137eedbfa3 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 20 Oct 2023 15:12:46 -0400 Subject: Checkpointing work I'll probably throw all this out, but I want to stash it just in case. --- include/framework/scheduling/SerialScheduler.h | 65 +++++++++++++------------- include/framework/scheduling/Task.h | 52 +++++++++++++++++++++ 2 files changed, 84 insertions(+), 33 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index 5e16bdf..c43e930 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -1,7 +1,7 @@ /* * include/framework/Scheduler.h * - * Copyright (C) 2023 Douglas Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * Dong Xie * * All rights reserved. Published under the Modified BSD License. @@ -14,6 +14,7 @@ #include #include #include +#include #include "util/types.h" #include "framework/interface/Shard.h" @@ -24,6 +25,8 @@ #include "framework/structure/ExtensionStructure.h" #include "framework/scheduling/Task.h" +#include "psu-ds/LockedPriorityQueue.h" + namespace de { template @@ -73,31 +76,21 @@ public: std::vector merges = version->get_merge_tasks(buffer->get_record_count()); /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) + * Schedule the merge tasks */ for (ssize_t i=0; iget_record_count() * sizeof(R) * 2; - buffer_merge.m_timestamp = m_timestamp.fetch_add(1); - buffer_merge.m_type = TaskType::MERGE; - m_merge_queue_lock.lock(); - m_merge_queue.emplace(buffer_merge); - m_merge_queue_lock.unlock(); + auto t = MergeTask(-1, 0, buffer->get_record_count() * sizeof(R) * 2, m_timestamp.fetch_add(1)); + m_task_queue.push(t); m_cv.notify_all(); do { std::unique_lock merge_cv_lock(m_merge_cv_lock); m_merge_cv.wait(merge_cv_lock); - } while (m_merge_queue.size() > 0); + } while (m_task_queue.size() > 0); assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); @@ -128,10 +121,7 @@ private: } void schedule_next_task() { - m_merge_queue_lock.lock(); - auto task = m_merge_queue.top(); - m_merge_queue.pop(); - m_merge_queue_lock.unlock(); + auto task = m_task_queue.pop(); auto type = std::visit(GetTaskType{}, task); @@ -145,7 +135,7 @@ private: default: assert(false); } - if (m_merge_queue.size() == 0) { + if (m_task_queue.size() == 0) { m_merge_cv.notify_all(); } } @@ -157,15 +147,27 @@ private: if (!version->validate_tombstone_proportion(task.m_target_level)) { auto tasks = version->get_merge_tasks(task.m_target_level); /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) + * Schedule the merge tasks */ - for (ssize_t i=tasks.size()-1; i>=0; i--) { + std::promise trigger_prom; + tasks[tasks.size() - 1].make_dependent_on(trigger_prom); + tasks[tasks.size() - 1].m_timestamp = m_timestamp.fetch_add(1); + m_task_queue.push(tasks[tasks.size() - 1]); + + for (ssize_t i=tasks.size()-2; i>=0; i--) { + tasks[i].make_dependent_on(tasks[i+1]); tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(tasks[i]); - m_merge_queue_lock.unlock(); + m_task_queue.push(tasks[i]); } + + /* + * Block the completion of any task until all have been + * scheduled. Probably not strictly necessary, but due to + * interface constraints with the way promises are used, + * a dummy promise needs to be set up for the first job + * anyway. It's easiest to just release it here. + */ + trigger_prom.set_value(); } } @@ -181,9 +183,7 @@ private: */ for (ssize_t i=tasks.size()-1; i>=0; i--) { tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(tasks[i]); - m_merge_queue_lock.unlock(); + m_task_queue.push(tasks[i]); } } } @@ -193,7 +193,7 @@ private: std::unique_lock cv_lock(m_cv_lock); m_cv.wait(cv_lock); - while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { + while (m_task_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { schedule_next_task(); } cv_lock.unlock(); @@ -210,8 +210,7 @@ private: alignas(64) std::atomic m_used_threads; alignas(64) std::atomic m_timestamp; - std::priority_queue, std::greater> m_merge_queue; - std::mutex m_merge_queue_lock; + psudb::LockedPriorityQueue, std::greater> m_task_queue; std::mutex m_cv_lock; std::condition_variable m_cv; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 9e0655a..3c1b158 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -4,6 +4,7 @@ #pragma once #include +#include #include "framework/util/Configuration.h" @@ -14,17 +15,52 @@ enum class TaskType { QUERY }; +struct TaskDependency { + std::promise prom; + std::future fut; +}; + struct MergeTask { level_index m_source_level; level_index m_target_level; size_t m_timestamp; size_t m_size; TaskType m_type; + std::unique_ptr m_dep; + + MergeTask() = default; + + MergeTask(level_index source, level_index target, size_t size, size_t timestamp) + : m_source_level(source) + , m_target_level(target) + , m_timestamp(timestamp) + , m_size(size) + , m_type(TaskType::MERGE) + , m_dep(std::make_unique()){} + + + MergeTask(MergeTask &t) + : m_source_level(t.m_source_level) + , m_target_level(t.m_target_level) + , m_timestamp(t.m_timestamp) + , m_size(t.m_size) + , m_type(TaskType::MERGE) + , m_dep(std::move(t.m_dep)) + {} + TaskType get_type() const { return m_type; } + void make_dependent_on(MergeTask &task) { + m_dep->fut = task.m_dep->prom.get_future(); + } + + void make_dependent_on(TaskDependency *dep) { + m_dep->fut = dep->prom.get_future(); + } + friend bool operator<(const MergeTask &self, const MergeTask &other) { return self.m_timestamp < other.m_timestamp; } @@ -39,11 +75,27 @@ struct QueryTask { size_t m_timestamp; size_t m_size; TaskType m_type; + std::unique_ptr m_dep; + + QueryTask(QueryTask &t) + : m_timestamp(t.m_timestamp) + , m_size(t.m_size) + , m_type(t.m_type) + , m_dep(std::move(t.m_dep)) + {} TaskType get_type() const { return m_type; } + void SetDependency(QueryTask &task) { + m_dep->fut = task.m_dep->prom.get_future(); + } + + void SetDependency(TaskDependency *dep) { + m_dep->fut = dep->prom.get_future(); + } + friend bool operator<(const QueryTask &self, const QueryTask &other) { return self.m_timestamp < other.m_timestamp; } -- cgit v1.2.3 From 7ecfb22c32b7986ed1a2439c1abbeed298e4153a Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 20 Oct 2023 17:00:42 -0400 Subject: Initial pass w/ new scheduler setup currently there's a race condition of some type to sort out. --- include/framework/scheduling/SerialScheduler.h | 191 ++++--------------------- include/framework/scheduling/Task.h | 112 ++++----------- 2 files changed, 57 insertions(+), 246 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index c43e930..5d6e5c2 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -29,198 +29,67 @@ namespace de { -template class SerialScheduler { - typedef ExtensionStructure Structure; - typedef MutableBuffer Buffer; public: - /* - * A simple "scheduler" that runs tasks serially, in a FIFO manner. Incoming concurrent - * requests will wait for their turn, and only one task will be active in the system at - * a time. The scheduler will spin up a second thread for running itself, but all tasks - * will be single-threaded. - * - * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means - * unlimited. - * - * Note that the SerialScheduler object is non-concurrent, and so will ignore the - * thread_cnt argument. It will obey the memory_budget, however a failure due to - * memory constraints will be irrecoverable, as there is no way to free up memory - * or block particular tasks until memory becomes available. - */ - SerialScheduler(size_t memory_budget, size_t thread_cnt) - : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX) - , m_used_memory(0) - , m_used_threads(0) - , m_shutdown(false) - { - m_sched_thrd = std::thread(&SerialScheduler::run_scheduler, this); + SerialScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) + , m_used_memory(0) + , m_used_thrds(0) + , m_shutdown(false) + { + m_sched_thrd = std::thread(&SerialScheduler::run, this); } ~SerialScheduler() { - m_shutdown = true; + shutdown(); m_cv.notify_all(); m_sched_thrd.join(); } - bool schedule_merge(Structure *version, MutableBuffer *buffer) { - pending_version = version; - pending_buffer = buffer; - - /* - * Get list of individual level reconstructions that are necessary - * for completing the overall merge - */ - std::vector merges = version->get_merge_tasks(buffer->get_record_count()); - - /* - * Schedule the merge tasks - */ - for (ssize_t i=0; iget_record_count() * sizeof(R) * 2, m_timestamp.fetch_add(1)); - m_task_queue.push(t); - - m_cv.notify_all(); - do { - std::unique_lock merge_cv_lock(m_merge_cv_lock); - m_merge_cv.wait(merge_cv_lock); - } while (m_task_queue.size() > 0); - - assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); - - return true; + void schedule_job(std::function job, size_t size, void *args) { + size_t ts = m_counter.fetch_add(1); + m_task_queue.push(Task(size, ts, job, args)); } - bool schedule_query() { - return true; + void shutdown() { + m_shutdown = true; } private: - size_t get_timestamp() { - auto ts = m_timestamp.fetch_add(1); - return ts; - } - - void schedule_merge(MergeTask task) { - if (task.m_source_level == -1 && task.m_target_level == 0) { - run_buffer_merge(pending_buffer, pending_version); - } else { - run_merge(task, pending_version); - } - } - - - void schedule_query(QueryTask task) { - - } - - void schedule_next_task() { - auto task = m_task_queue.pop(); - - auto type = std::visit(GetTaskType{}, task); - - switch (type) { - case TaskType::MERGE: - schedule_merge(std::get(task)); - break; - case TaskType::QUERY: - schedule_query(std::get(task)); - break; - default: assert(false); - } - - if (m_task_queue.size() == 0) { - m_merge_cv.notify_all(); - } - } + psudb::LockedPriorityQueue m_task_queue; + size_t m_memory_budget; + size_t m_thrd_cnt; - void run_merge(MergeTask task, Structure *version) { - version->merge_levels(task.m_target_level, task.m_source_level); - - if (!version->validate_tombstone_proportion(task.m_target_level)) { - auto tasks = version->get_merge_tasks(task.m_target_level); - /* - * Schedule the merge tasks - */ - std::promise trigger_prom; - tasks[tasks.size() - 1].make_dependent_on(trigger_prom); - tasks[tasks.size() - 1].m_timestamp = m_timestamp.fetch_add(1); - m_task_queue.push(tasks[tasks.size() - 1]); - - for (ssize_t i=tasks.size()-2; i>=0; i--) { - tasks[i].make_dependent_on(tasks[i+1]); - tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_task_queue.push(tasks[i]); - } + bool m_shutdown; - /* - * Block the completion of any task until all have been - * scheduled. Probably not strictly necessary, but due to - * interface constraints with the way promises are used, - * a dummy promise needs to be set up for the first job - * anyway. It's easiest to just release it here. - */ - trigger_prom.set_value(); - } - } + std::atomic m_counter; + std::mutex m_cv_lock; + std::condition_variable m_cv; + std::thread m_sched_thrd; - void run_buffer_merge(Buffer *buffer, Structure *version) { - version->merge_buffer(buffer); - if (!version->validate_tombstone_proportion(0)) { - auto tasks = version->get_merge_tasks_from_level(0); + std::atomic m_used_thrds; + std::atomic m_used_memory; - /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) - */ - for (ssize_t i=tasks.size()-1; i>=0; i--) { - tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_task_queue.push(tasks[i]); - } - } + void schedule_next() { + auto t = m_task_queue.pop(); + t(); } - void run_scheduler() { + void run() { do { std::unique_lock cv_lock(m_cv_lock); m_cv.wait(cv_lock); - while (m_task_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { - schedule_next_task(); + while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) { + schedule_next(); } cv_lock.unlock(); } while(!m_shutdown); } - - size_t m_memory_budget; - size_t m_thread_cnt; - - Buffer *pending_buffer; - Structure *pending_version; - - alignas(64) std::atomic m_used_memory; - alignas(64) std::atomic m_used_threads; - alignas(64) std::atomic m_timestamp; - - psudb::LockedPriorityQueue, std::greater> m_task_queue; - - std::mutex m_cv_lock; - std::condition_variable m_cv; - - std::mutex m_merge_cv_lock; - std::condition_variable m_merge_cv; - - std::thread m_sched_thrd; - - bool m_shutdown; }; } diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 3c1b158..518159d 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -5,111 +5,53 @@ #include #include +#include #include "framework/util/Configuration.h" namespace de { -enum class TaskType { - MERGE, - QUERY +struct MergeArgs { + void *version; + void *buffer; + std::vector merges; + std::promise result; }; -struct TaskDependency { - std::promise prom; - std::future fut; +template +struct QueryArgs { + void *version; + void *buffer; + std::promise> result_set; + void *query_parms; }; -struct MergeTask { - level_index m_source_level; - level_index m_target_level; - size_t m_timestamp; - size_t m_size; - TaskType m_type; - std::unique_ptr m_dep; - - MergeTask() = default; - - MergeTask(level_index source, level_index target, size_t size, size_t timestamp) - : m_source_level(source) - , m_target_level(target) - , m_timestamp(timestamp) - , m_size(size) - , m_type(TaskType::MERGE) - , m_dep(std::make_unique()){} - +typedef std::function Job; - MergeTask(MergeTask &t) - : m_source_level(t.m_source_level) - , m_target_level(t.m_target_level) - , m_timestamp(t.m_timestamp) - , m_size(t.m_size) - , m_type(TaskType::MERGE) - , m_dep(std::move(t.m_dep)) +struct Task { + Task(size_t size, size_t ts, Job job, void *args) + : m_job(job) + , m_size(size) + , m_timestamp(ts) + , m_args(args) {} - - TaskType get_type() const { - return m_type; - } - - void make_dependent_on(MergeTask &task) { - m_dep->fut = task.m_dep->prom.get_future(); - } - - void make_dependent_on(TaskDependency *dep) { - m_dep->fut = dep->prom.get_future(); - } - - friend bool operator<(const MergeTask &self, const MergeTask &other) { - return self.m_timestamp < other.m_timestamp; - } - - friend bool operator>(const MergeTask &self, const MergeTask &other) { - return self.m_timestamp > other.m_timestamp; - } - -}; - -struct QueryTask { - size_t m_timestamp; + Job m_job; size_t m_size; - TaskType m_type; - std::unique_ptr m_dep; - - QueryTask(QueryTask &t) - : m_timestamp(t.m_timestamp) - , m_size(t.m_size) - , m_type(t.m_type) - , m_dep(std::move(t.m_dep)) - {} - - TaskType get_type() const { - return m_type; - } - - void SetDependency(QueryTask &task) { - m_dep->fut = task.m_dep->prom.get_future(); - } - - void SetDependency(TaskDependency *dep) { - m_dep->fut = dep->prom.get_future(); - } + size_t m_timestamp; + void *m_args; - friend bool operator<(const QueryTask &self, const QueryTask &other) { + friend bool operator<(const Task &self, const Task &other) { return self.m_timestamp < other.m_timestamp; } - friend bool operator>(const QueryTask &self, const QueryTask &other) { + friend bool operator>(const Task &self, const Task &other) { return self.m_timestamp > other.m_timestamp; } -}; -struct GetTaskType { - TaskType operator()(const MergeTask &t) { return t.get_type(); } - TaskType operator()(const QueryTask &t) { return t.get_type(); } + void operator()() { + m_job(m_args); + } }; -typedef std::variant Task; - } -- cgit v1.2.3 From b72103cb11347f0dd108bd2321f29b0d6ab05106 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 23 Oct 2023 13:18:30 -0400 Subject: Bugfixes --- include/framework/scheduling/SerialScheduler.h | 1 + 1 file changed, 1 insertion(+) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index 5d6e5c2..da2bb8e 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -51,6 +51,7 @@ public: void schedule_job(std::function job, size_t size, void *args) { size_t ts = m_counter.fetch_add(1); m_task_queue.push(Task(size, ts, job, args)); + m_cv.notify_all(); } void shutdown() { -- cgit v1.2.3 From 3afacb7702e6d8fa67749a2a41dc776d315e02a9 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 23 Oct 2023 17:43:22 -0400 Subject: Began moving to an explicit epoch-based system I started moving over to an explicit Epoch based system, which has necessitated a ton of changes throughout the code base. This will ultimately allow for a much cleaner set of abstractions for managing concurrency. --- include/framework/scheduling/Epoch.h | 128 +++++++++++++++++++++++++ include/framework/scheduling/FIFOScheduler.h | 96 +++++++++++++++++++ include/framework/scheduling/SerialScheduler.h | 96 ------------------- include/framework/scheduling/Task.h | 10 +- 4 files changed, 229 insertions(+), 101 deletions(-) create mode 100644 include/framework/scheduling/Epoch.h create mode 100644 include/framework/scheduling/FIFOScheduler.h delete mode 100644 include/framework/scheduling/SerialScheduler.h (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h new file mode 100644 index 0000000..a1f865c --- /dev/null +++ b/include/framework/scheduling/Epoch.h @@ -0,0 +1,128 @@ +/* + * include/framework/scheduling/Epoch.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * Dong Xie + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include "framework/structure/MutableBuffer.h" +#include "framework/structure/ExtensionStructure.h" +#include "framework/structure/BufferView.h" + +namespace de { + + +template +class Epoch { +private: + typedef MutableBuffer Buffer; + typedef ExtensionStructure Structure; + typedef BufferView BufView; +public: + Epoch() + : m_buffers() + , m_structure(nullptr) + , m_active_jobs(0) + {} + + Epoch(Structure *structure, Buffer *buff) + : m_buffers() + , m_structure(structure) + , m_active_jobs(0) + { + m_buffers.push_back(buff); + } + + ~Epoch() { + assert(m_active_jobs.load() == 0); + + for (auto buf : m_buffers) { + buf.release_reference(); + } + + if (m_structure) { + m_structure->release_reference(); + } + } + + void add_buffer(Buffer *buf) { + assert(buf); + + buf->take_reference(); + m_buffers.push_back(buf); + } + + void start_job() { + m_active_jobs.fetch_add(1); + } + + void end_job() { + m_active_jobs.fetch_add(-1); + } + + size_t get_active_job_num() { + return m_active_jobs.load(); + } + + Structure *get_structure() { + return m_structure; + } + + std::vector &get_buffers() { + return m_buffers; + } + + BufView get_buffer_view() { + return BufView(m_buffers); + } + + Buffer *get_active_buffer() { + if (m_buffers.size() == 0) return nullptr; + + return m_buffers[m_buffers.size() - 1]; + } + + /* + * Return the number of buffers in this epoch at + * time of call, and then clear the buffer vector, + * releasing all references in the process. + */ + size_t clear_buffers() { + size_t buf_cnt = m_buffers.size(); + for (auto buf : m_buffers) { + if (buf) buf->release_reference(); + } + + m_buffers.clear(); + return buf_cnt; + } + + /* + * Returns a new Epoch object that is a copy of this one. The new object will also contain + * a copy of the m_structure, rather than a reference to the same one. + */ + Epoch *clone() { + auto epoch = new Epoch(); + epoch->m_buffers = m_buffers; + if (m_structure) { + epoch->m_structure = m_structure->copy(); + } + } + +private: + Structure *m_structure; + std::vector m_buffers; + + /* + * The number of currently active jobs + * (queries/merges) operating on this + * epoch. An epoch can only be retired + * when this number is 0. + */ + std::atomic m_active_jobs; +}; +} diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h new file mode 100644 index 0000000..878bb81 --- /dev/null +++ b/include/framework/scheduling/FIFOScheduler.h @@ -0,0 +1,96 @@ +/* + * include/framework/Scheduler.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * Dong Xie + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "util/types.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" +#include "framework/interface/Record.h" +#include "framework/structure/MutableBuffer.h" +#include "framework/util/Configuration.h" +#include "framework/structure/ExtensionStructure.h" +#include "framework/scheduling/Task.h" + +#include "psu-ds/LockedPriorityQueue.h" + +namespace de { + +class FIFOScheduler { +public: + FIFOScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) + , m_used_memory(0) + , m_used_thrds(0) + , m_shutdown(false) + { + m_sched_thrd = std::thread(&FIFOScheduler::run, this); + } + + ~FIFOScheduler() { + shutdown(); + + m_cv.notify_all(); + m_sched_thrd.join(); + } + + void schedule_job(std::function job, size_t size, void *args) { + size_t ts = m_counter.fetch_add(1); + m_task_queue.push(Task(size, ts, job, args)); + m_cv.notify_all(); + } + + void shutdown() { + m_shutdown = true; + } + +private: + psudb::LockedPriorityQueue m_task_queue; + + size_t m_memory_budget; + size_t m_thrd_cnt; + + bool m_shutdown; + + std::atomic m_counter; + std::mutex m_cv_lock; + std::condition_variable m_cv; + + std::thread m_sched_thrd; + + std::atomic m_used_thrds; + std::atomic m_used_memory; + + void schedule_next() { + auto t = m_task_queue.pop(); + t(); + } + + void run() { + do { + std::unique_lock cv_lock(m_cv_lock); + m_cv.wait(cv_lock); + + while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) { + schedule_next(); + } + cv_lock.unlock(); + } while(!m_shutdown); + } +}; + +} diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h deleted file mode 100644 index da2bb8e..0000000 --- a/include/framework/scheduling/SerialScheduler.h +++ /dev/null @@ -1,96 +0,0 @@ -/* - * include/framework/Scheduler.h - * - * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie - * - * All rights reserved. Published under the Modified BSD License. - * - */ -#pragma once - -#include -#include -#include -#include -#include -#include - -#include "util/types.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" -#include "framework/interface/Record.h" -#include "framework/structure/MutableBuffer.h" -#include "framework/util/Configuration.h" -#include "framework/structure/ExtensionStructure.h" -#include "framework/scheduling/Task.h" - -#include "psu-ds/LockedPriorityQueue.h" - -namespace de { - -class SerialScheduler { -public: - SerialScheduler(size_t memory_budget, size_t thread_cnt) - : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) - , m_used_memory(0) - , m_used_thrds(0) - , m_shutdown(false) - { - m_sched_thrd = std::thread(&SerialScheduler::run, this); - } - - ~SerialScheduler() { - shutdown(); - - m_cv.notify_all(); - m_sched_thrd.join(); - } - - void schedule_job(std::function job, size_t size, void *args) { - size_t ts = m_counter.fetch_add(1); - m_task_queue.push(Task(size, ts, job, args)); - m_cv.notify_all(); - } - - void shutdown() { - m_shutdown = true; - } - -private: - psudb::LockedPriorityQueue m_task_queue; - - size_t m_memory_budget; - size_t m_thrd_cnt; - - bool m_shutdown; - - std::atomic m_counter; - std::mutex m_cv_lock; - std::condition_variable m_cv; - - std::thread m_sched_thrd; - - std::atomic m_used_thrds; - std::atomic m_used_memory; - - void schedule_next() { - auto t = m_task_queue.pop(); - t(); - } - - void run() { - do { - std::unique_lock cv_lock(m_cv_lock); - m_cv.wait(cv_lock); - - while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) { - schedule_next(); - } - cv_lock.unlock(); - } while(!m_shutdown); - } -}; - -} diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 518159d..94c4d0a 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -8,20 +8,20 @@ #include #include "framework/util/Configuration.h" +#include "framework/scheduling/Epoch.h" namespace de { +template struct MergeArgs { - void *version; - void *buffer; + Epoch *epoch; std::vector merges; std::promise result; }; -template +template struct QueryArgs { - void *version; - void *buffer; + Epoch *epoch; std::promise> result_set; void *query_parms; }; -- cgit v1.2.3 From 39ae3e0441d8297a09197aba98bd494b5ada12c1 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 30 Oct 2023 14:17:59 -0400 Subject: Concurrency updates + fixes for compile errors --- include/framework/scheduling/Epoch.h | 4 +++- include/framework/scheduling/Task.h | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index a1f865c..fe63c86 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -41,7 +41,7 @@ public: assert(m_active_jobs.load() == 0); for (auto buf : m_buffers) { - buf.release_reference(); + buf->release_reference(); } if (m_structure) { @@ -111,6 +111,8 @@ public: if (m_structure) { epoch->m_structure = m_structure->copy(); } + + return epoch; } private: diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 94c4d0a..d25c7c0 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -17,6 +17,7 @@ struct MergeArgs { Epoch *epoch; std::vector merges; std::promise result; + void *extension; }; template -- cgit v1.2.3 From 40b87b74f2bf4e93fdc9dabd6eab9175187fb63c Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 30 Oct 2023 14:47:16 -0400 Subject: FIFOScheduler: correctly protect m_cv with a lock --- include/framework/scheduling/FIFOScheduler.h | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 878bb81..7ccab26 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -44,13 +44,18 @@ public: ~FIFOScheduler() { shutdown(); + std::unique_lock lk(m_cv_lock); m_cv.notify_all(); + lk.release(); + m_sched_thrd.join(); } void schedule_job(std::function job, size_t size, void *args) { size_t ts = m_counter.fetch_add(1); m_task_queue.push(Task(size, ts, job, args)); + + std::unique_lock lk(m_cv_lock); m_cv.notify_all(); } -- cgit v1.2.3 From 32aeedbaf6584eb71126cbe92cb42e93b65d69d3 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 30 Oct 2023 14:47:35 -0400 Subject: Epoch/DynamicExtension: added cv to epoch retirement check Instead of busy waiting on the active job count, a condition variable is now used to wait for all active jobs to finish before freeing an epoch's resources. --- include/framework/scheduling/Epoch.h | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index fe63c86..87463bd 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -62,6 +62,11 @@ public: void end_job() { m_active_jobs.fetch_add(-1); + + if (m_active_jobs.load() == 0) { + std::unique_lock lk(m_cv_lock); + m_active_cv.notify_all(); + } } size_t get_active_job_num() { @@ -115,10 +120,35 @@ public: return epoch; } + /* + * + */ + bool retirable() { + /* if epoch is currently active, then it cannot be retired */ + if (m_active) { + return false; + } + + /* + * if the epoch has active jobs but is not itself active, + * wait for them to finish and return true. If there are + * not active jobs, return true immediately + */ + while (m_active_jobs > 0) { + std::unique_lock lk(m_cv_lock); + m_active_cv.wait(lk); + } + + return true; + } + private: Structure *m_structure; std::vector m_buffers; + std::condition_variable m_active_cv; + std::mutex m_cv_lock; + /* * The number of currently active jobs * (queries/merges) operating on this @@ -126,5 +156,6 @@ private: * when this number is 0. */ std::atomic m_active_jobs; + bool m_active; }; } -- cgit v1.2.3 From d2279e1b96d352a0af1d425dcaaf93e8a26a8d52 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 30 Oct 2023 17:15:05 -0400 Subject: General Comment + Consistency updates --- include/framework/scheduling/Epoch.h | 1 - include/framework/scheduling/FIFOScheduler.h | 3 +- include/framework/scheduling/Scheduler.h | 195 --------------------------- include/framework/scheduling/Task.h | 6 +- 4 files changed, 6 insertions(+), 199 deletions(-) delete mode 100644 include/framework/scheduling/Scheduler.h (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 87463bd..03cbb62 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -2,7 +2,6 @@ * include/framework/scheduling/Epoch.h * * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie * * All rights reserved. Published under the Modified BSD License. * diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 7ccab26..5425c4f 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -1,8 +1,7 @@ /* - * include/framework/Scheduler.h + * include/framework/scheduling/FIFOScheduler.h * * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie * * All rights reserved. Published under the Modified BSD License. * diff --git a/include/framework/scheduling/Scheduler.h b/include/framework/scheduling/Scheduler.h deleted file mode 100644 index 992cbf9..0000000 --- a/include/framework/scheduling/Scheduler.h +++ /dev/null @@ -1,195 +0,0 @@ -/* - * include/framework/Scheduler.h - * - * Copyright (C) 2023 Douglas Rumbaugh - * Dong Xie - * - * All rights reserved. Published under the Modified BSD License. - * - */ -#pragma once - -#include -#include -#include -#include -#include - -#include "util/types.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" -#include "framework/interface/Record.h" -#include "framework/structure/MutableBuffer.h" -#include "framework/util/Configuration.h" -#include "framework/structure/ExtensionStructure.h" - -namespace de { - -template -class Scheduler { - typedef ExtensionStructure Structure; - typedef MutableBuffer Buffer; -public: - /* - * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means - * unlimited. - */ - Scheduler(size_t memory_budget, size_t thread_cnt) - : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX) - , m_used_memory(0) - , m_used_threads(0) - , m_shutdown(false) - { - m_sched_thrd = std::thread(&Scheduler::run_scheduler, this); - } - - ~Scheduler() { - m_shutdown = true; - - m_cv.notify_all(); - m_sched_thrd.join(); - } - - bool schedule_merge(Structure *version, MutableBuffer *buffer) { - /* - * temporary hack - */ - pending_version = version; - pending_buffer = buffer; - - /* - * Get list of individual level reconstructions that are necessary - * for completing the overall merge - */ - std::vector merges = version->get_merge_tasks(buffer->get_record_count()); - - /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) - */ - for (ssize_t i=0; iget_record_count() * sizeof(R) * 2; - buffer_merge.m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(buffer_merge); - m_merge_queue_lock.unlock(); - - m_cv.notify_all(); - do { - std::unique_lock merge_cv_lock(m_merge_cv_lock); - m_merge_cv.wait(merge_cv_lock); - } while (m_merge_queue.size() > 0); - - assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); - - return true; - } - -private: - size_t get_timestamp() { - auto ts = m_timestamp.fetch_add(1); - return ts; - } - - void schedule_next_task() { - m_merge_queue_lock.lock(); - auto task = m_merge_queue.top(); - m_merge_queue.pop(); - m_merge_queue_lock.unlock(); - - if (task.m_source_level == -1 && task.m_target_level == 0) { - run_buffer_merge(pending_buffer, pending_version); - } else { - run_merge(task, pending_version); - } - - if (m_merge_queue.size() == 0) { - m_merge_cv.notify_all(); - } - } - - - void run_merge(MergeTask task, Structure *version) { - version->merge_levels(task.m_target_level, task.m_source_level); - - if (!version->validate_tombstone_proportion(task.m_target_level)) { - auto tasks = version->get_merge_tasks(task.m_target_level); - /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) - */ - for (ssize_t i=tasks.size()-1; i>=0; i--) { - tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(tasks[i]); - m_merge_queue_lock.unlock(); - } - } - } - - - void run_buffer_merge(Buffer *buffer, Structure *version) { - version->merge_buffer(buffer); - if (!version->validate_tombstone_proportion(0)) { - auto tasks = version->get_merge_tasks_from_level(0); - - /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) - */ - for (ssize_t i=tasks.size()-1; i>=0; i--) { - tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(tasks[i]); - m_merge_queue_lock.unlock(); - } - } - } - - void run_scheduler() { - do { - std::unique_lock cv_lock(m_cv_lock); - m_cv.wait(cv_lock); - - while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { - schedule_next_task(); - } - cv_lock.unlock(); - } while(!m_shutdown); - } - - size_t m_memory_budget; - size_t m_thread_cnt; - - Buffer *pending_buffer; - Structure *pending_version; - - alignas(64) std::atomic m_used_memory; - alignas(64) std::atomic m_used_threads; - alignas(64) std::atomic m_timestamp; - - std::priority_queue, std::greater> m_merge_queue; - std::mutex m_merge_queue_lock; - - std::mutex m_cv_lock; - std::condition_variable m_cv; - - std::mutex m_merge_cv_lock; - std::condition_variable m_merge_cv; - - std::thread m_sched_thrd; - - bool m_shutdown; -}; - -} diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index d25c7c0..228665f 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -1,9 +1,13 @@ /* + * include/framework/scheduling/Task.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * + * All rights reserved. Published under the Modified BSD License. * */ #pragma once -#include #include #include -- cgit v1.2.3 From 62792753bb4df2515e5e2d8cc48bca568c5379fd Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 11:48:56 -0400 Subject: Epoch: Creating an epoch now takes references on buffers + versions When an epoch is created using the constructor Epoch(Structure, Buffer), it will call take_reference() on both. This was necessary to ensure that the destructor doesn't fail, as it releases references and fails if the refcnt is 0. It releases the user of the object from the burden of manually taking references in this situation. --- include/framework/scheduling/Epoch.h | 2 ++ 1 file changed, 2 insertions(+) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 03cbb62..6bbf927 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -33,6 +33,8 @@ public: , m_structure(structure) , m_active_jobs(0) { + structure->take_reference(); + buff->take_reference(); m_buffers.push_back(buff); } -- cgit v1.2.3 From 7163b8db0ee5acc099a228090a4bdee379c1c8af Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 11:53:08 -0400 Subject: SerialScheduler: added a single-threaded scheduler Added a new scheduler for ensuring single-threaded operation. Additionally, added a static assert to (at least for now) restrict the use of tagging to this single threaded scheduler. --- include/framework/scheduling/SerialScheduler.h | 67 ++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 include/framework/scheduling/SerialScheduler.h (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h new file mode 100644 index 0000000..9c767e8 --- /dev/null +++ b/include/framework/scheduling/SerialScheduler.h @@ -0,0 +1,67 @@ +/* + * include/framework/scheduling/SerialScheduler.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * + * All rights reserved. Published under the Modified BSD License. + * + * IMPORTANT: This "scheduler" is a shim implementation for allowing + * strictly serial, single-threaded operation of the framework. It should + * never be used in multi-threaded contexts. A call to the schedule_job + * function will immediately run the job and block on its completion before + * returning. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "util/types.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" +#include "framework/interface/Record.h" +#include "framework/structure/MutableBuffer.h" +#include "framework/util/Configuration.h" +#include "framework/structure/ExtensionStructure.h" +#include "framework/scheduling/Task.h" + +namespace de { + +class SerialScheduler { +public: + SerialScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) + , m_used_memory(0) + , m_used_thrds(0) + , m_counter(0) + {} + + ~SerialScheduler() = default; + + void schedule_job(std::function job, size_t size, void *args) { + size_t ts = m_counter++; + auto t = Task(size, ts, job, args); + t(); + } + + void shutdown() { + /* intentionally left blank */ + } + +private: + size_t m_memory_budget; + size_t m_thrd_cnt; + + size_t m_used_thrds; + size_t m_used_memory; + + size_t m_counter; +}; + +} -- cgit v1.2.3 From 786a1cf5ab76f94a1adece48c1de53fb32e4551e Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 11:54:09 -0400 Subject: FIFOScheduler: fixed a few synchronization issues --- include/framework/scheduling/FIFOScheduler.h | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 5425c4f..91a72b3 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -43,23 +43,20 @@ public: ~FIFOScheduler() { shutdown(); - std::unique_lock lk(m_cv_lock); m_cv.notify_all(); - lk.release(); - m_sched_thrd.join(); } void schedule_job(std::function job, size_t size, void *args) { + std::unique_lock lk(m_cv_lock); size_t ts = m_counter.fetch_add(1); m_task_queue.push(Task(size, ts, job, args)); - std::unique_lock lk(m_cv_lock); m_cv.notify_all(); } void shutdown() { - m_shutdown = true; + m_shutdown.store(true); } private: @@ -68,7 +65,7 @@ private: size_t m_memory_budget; size_t m_thrd_cnt; - bool m_shutdown; + std::atomic m_shutdown; std::atomic m_counter; std::mutex m_cv_lock; @@ -80,6 +77,7 @@ private: std::atomic m_used_memory; void schedule_next() { + assert(m_task_queue.size() > 0); auto t = m_task_queue.pop(); t(); } @@ -92,8 +90,7 @@ private: while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) { schedule_next(); } - cv_lock.unlock(); - } while(!m_shutdown); + } while(!m_shutdown.load()); } }; -- cgit v1.2.3 From 68ae6279476e7d37837ac06474fb558e50ce6706 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 12:41:55 -0400 Subject: Fixes for various bugs under SerialScheduler --- include/framework/scheduling/Epoch.h | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 6bbf927..f4aefe9 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -22,16 +22,20 @@ private: typedef ExtensionStructure Structure; typedef BufferView BufView; public: - Epoch() + Epoch(size_t number=0) : m_buffers() , m_structure(nullptr) , m_active_jobs(0) + , m_active(true) + , m_epoch_number(number) {} - Epoch(Structure *structure, Buffer *buff) + Epoch(size_t number, Structure *structure, Buffer *buff) : m_buffers() , m_structure(structure) , m_active_jobs(0) + , m_active(true) + , m_epoch_number(number) { structure->take_reference(); buff->take_reference(); @@ -62,6 +66,7 @@ public: } void end_job() { + assert(m_active_jobs.load() > 0); m_active_jobs.fetch_add(-1); if (m_active_jobs.load() == 0) { @@ -74,6 +79,10 @@ public: return m_active_jobs.load(); } + size_t get_epoch_number() { + return m_epoch_number; + } + Structure *get_structure() { return m_structure; } @@ -109,18 +118,29 @@ public: /* * Returns a new Epoch object that is a copy of this one. The new object will also contain - * a copy of the m_structure, rather than a reference to the same one. + * a copy of the m_structure, rather than a reference to the same one. The epoch number of + * the new epoch will be set to the provided argument. */ - Epoch *clone() { - auto epoch = new Epoch(); + Epoch *clone(size_t number) { + auto epoch = new Epoch(number); epoch->m_buffers = m_buffers; if (m_structure) { epoch->m_structure = m_structure->copy(); + /* the copy routine returns a structure with 0 references */ + epoch->m_structure->take_reference(); + } + + for (auto b : m_buffers) { + b->take_reference(); } return epoch; } + void set_inactive() { + m_active = false; + } + /* * */ @@ -158,5 +178,6 @@ private: */ std::atomic m_active_jobs; bool m_active; + size_t m_epoch_number; }; } -- cgit v1.2.3 From 4e4cf858122ca6c1ae6d5f635e839089769fee38 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 10:01:23 -0500 Subject: Scheduling: Switched over to a thread pool model --- include/framework/scheduling/FIFOScheduler.h | 13 ++++++++++--- include/framework/scheduling/SerialScheduler.h | 2 +- include/framework/scheduling/Task.h | 2 +- 3 files changed, 12 insertions(+), 5 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 91a72b3..1521eb6 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -24,20 +24,26 @@ #include "framework/structure/ExtensionStructure.h" #include "framework/scheduling/Task.h" +#include "ctpl/ctpl.h" #include "psu-ds/LockedPriorityQueue.h" namespace de { + class FIFOScheduler { +private: + static const size_t DEFAULT_MAX_THREADS = 8; + public: FIFOScheduler(size_t memory_budget, size_t thread_cnt) : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) + , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS) , m_used_memory(0) , m_used_thrds(0) , m_shutdown(false) { m_sched_thrd = std::thread(&FIFOScheduler::run, this); + m_thrd_pool.resize(m_thrd_cnt); } ~FIFOScheduler() { @@ -72,6 +78,7 @@ private: std::condition_variable m_cv; std::thread m_sched_thrd; + ctpl::thread_pool m_thrd_pool; std::atomic m_used_thrds; std::atomic m_used_memory; @@ -79,7 +86,7 @@ private: void schedule_next() { assert(m_task_queue.size() > 0); auto t = m_task_queue.pop(); - t(); + m_thrd_pool.push(t); } void run() { @@ -87,7 +94,7 @@ private: std::unique_lock cv_lock(m_cv_lock); m_cv.wait(cv_lock); - while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) { + while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) { schedule_next(); } } while(!m_shutdown.load()); diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index 9c767e8..93611d1 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -47,7 +47,7 @@ public: void schedule_job(std::function job, size_t size, void *args) { size_t ts = m_counter++; auto t = Task(size, ts, job, args); - t(); + t(0); } void shutdown() { diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 228665f..6dfd7df 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -54,7 +54,7 @@ struct Task { return self.m_timestamp > other.m_timestamp; } - void operator()() { + void operator()(size_t thrd_id) { m_job(m_args); } }; -- cgit v1.2.3 From ca1605a9924e27ccbacb33d04ccdb4326e7abe74 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 12:37:06 -0500 Subject: Epoch: Adjusted add empty buffer behavior Add empty buffer now supports a CAS-like operation, where it will only add a buffer if the currently active one is still the same as when the decision to add a buffer was made. This is to support adding new buffers on insert outside of the merge-lock, so that multiple concurrent threads cannot add multiple new empty buffers. --- include/framework/scheduling/Epoch.h | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index f4aefe9..58fe6cd 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -54,11 +54,25 @@ public: } } - void add_buffer(Buffer *buf) { + Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) { assert(buf); + /* + * if a current buffer is specified, only add the + * new buffer if the active buffer is the current, + * otherwise just return the active buffer (poor man's + * CAS). + */ + if (cur_buf) { + auto active_buf = get_active_buffer(); + if (active_buf != cur_buf) { + return active_buf; + } + } + buf->take_reference(); m_buffers.push_back(buf); + return buf; } void start_job() { -- cgit v1.2.3 From 254f8aa85ea8962e5c11d8b475a171883c22f168 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 12:39:35 -0500 Subject: DynamicExtension: internal_append fixes Fixed a few bugs with concurrent operation of internal_append, as well as enabled the spawning of multiple empty buffers while merges are currently active. --- include/framework/scheduling/Epoch.h | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 58fe6cd..0ebbde9 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -25,6 +25,7 @@ public: Epoch(size_t number=0) : m_buffers() , m_structure(nullptr) + , m_active_merge(false) , m_active_jobs(0) , m_active(true) , m_epoch_number(number) @@ -34,6 +35,7 @@ public: : m_buffers() , m_structure(structure) , m_active_jobs(0) + , m_active_merge(false) , m_active(true) , m_epoch_number(number) { @@ -151,6 +153,31 @@ public: return epoch; } + /* + * Check if a merge can be started from this Epoch. + * At present, without concurrent merging, this simply + * checks if there is currently a scheduled merge based + * on this Epoch. If there is, returns false. If there + * isn't, return true and set a flag indicating that + * there is an active merge. + */ + bool prepare_merge() { + auto old = m_active_merge.load(); + if (old) { + return false; + } + + // FIXME: this needs cleaned up + while (!m_active_merge.compare_exchange_strong(old, true)) { + old = m_active_merge.load(); + if (old) { + return false; + } + } + + return true; + } + void set_inactive() { m_active = false; } @@ -184,6 +211,8 @@ private: std::condition_variable m_active_cv; std::mutex m_cv_lock; + std::atomic m_active_merge; + /* * The number of currently active jobs * (queries/merges) operating on this -- cgit v1.2.3 From 357cab549c2ed33970562b84ff6f83923742343d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 7 Nov 2023 15:34:24 -0500 Subject: Comment and License updates --- include/framework/scheduling/Epoch.h | 2 +- include/framework/scheduling/FIFOScheduler.h | 2 +- include/framework/scheduling/SerialScheduler.h | 2 +- include/framework/scheduling/Task.h | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 0ebbde9..9193b06 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -3,7 +3,7 @@ * * Copyright (C) 2023 Douglas B. Rumbaugh * - * All rights reserved. Published under the Modified BSD License. + * Distributed under the Modified BSD License. * */ #pragma once diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 1521eb6..ba62f9e 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -3,7 +3,7 @@ * * Copyright (C) 2023 Douglas B. Rumbaugh * - * All rights reserved. Published under the Modified BSD License. + * Distributed under the Modified BSD License. * */ #pragma once diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index 93611d1..10c2af2 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -3,7 +3,7 @@ * * Copyright (C) 2023 Douglas B. Rumbaugh * - * All rights reserved. Published under the Modified BSD License. + * Distributed under the Modified BSD License. * * IMPORTANT: This "scheduler" is a shim implementation for allowing * strictly serial, single-threaded operation of the framework. It should diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 6dfd7df..d211fb5 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -3,7 +3,7 @@ * * Copyright (C) 2023 Douglas B. Rumbaugh * - * All rights reserved. Published under the Modified BSD License. + * Distributed under the Modified BSD License. * */ #pragma once -- cgit v1.2.3 From 39d22316be1708073e4fe1f708814cc801ecdc69 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 9 Nov 2023 11:08:34 -0500 Subject: Fixed various concurrency bugs 1. The system should now cleanly shutdown when the DynamicExtension object is destroyed. Before now, this would lead to use-after-frees and/or deadlocks. 2. Improved synchronization on mutable buffer structure management to fix the issue of the framework losing track of buffers during Epoch changeovers. --- include/framework/scheduling/Epoch.h | 21 ++++++++++++++++----- include/framework/scheduling/FIFOScheduler.h | 7 +++++-- 2 files changed, 21 insertions(+), 7 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 9193b06..fc08d57 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -47,9 +47,14 @@ public: ~Epoch() { assert(m_active_jobs.load() == 0); - for (auto buf : m_buffers) { - buf->release_reference(); - } + /* FIXME: this is needed to keep the destructor from + * sometimes locking up here. But there *shouldn't* be + * any threads waiting on this signal at object destruction, + * so something else is going on here that needs looked into + */ + //m_active_cv.notify_all(); + + clear_buffers(); if (m_structure) { m_structure->release_reference(); @@ -59,6 +64,7 @@ public: Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) { assert(buf); + std::unique_lock m_buffer_lock; /* * if a current buffer is specified, only add the * new buffer if the active buffer is the current, @@ -108,6 +114,7 @@ public: } BufView get_buffer_view() { + std::unique_lock m_buffer_lock; return BufView(m_buffers); } @@ -123,6 +130,7 @@ public: * releasing all references in the process. */ size_t clear_buffers() { + std::unique_lock m_buffer_lock; size_t buf_cnt = m_buffers.size(); for (auto buf : m_buffers) { if (buf) buf->release_reference(); @@ -138,6 +146,7 @@ public: * the new epoch will be set to the provided argument. */ Epoch *clone(size_t number) { + std::unique_lock m_buffer_lock; auto epoch = new Epoch(number); epoch->m_buffers = m_buffers; if (m_structure) { @@ -196,8 +205,8 @@ public: * wait for them to finish and return true. If there are * not active jobs, return true immediately */ - while (m_active_jobs > 0) { - std::unique_lock lk(m_cv_lock); + std::unique_lock lk(m_cv_lock); + while (m_active_jobs.load() > 0) { m_active_cv.wait(lk); } @@ -211,6 +220,8 @@ private: std::condition_variable m_active_cv; std::mutex m_cv_lock; + std::mutex m_buffer_lock; + std::atomic m_active_merge; /* diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index ba62f9e..4cdc436 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -47,9 +47,10 @@ public: } ~FIFOScheduler() { - shutdown(); + if (!m_shutdown.load()) { + shutdown(); + } - m_cv.notify_all(); m_sched_thrd.join(); } @@ -63,6 +64,8 @@ public: void shutdown() { m_shutdown.store(true); + m_thrd_pool.stop(true); + m_cv.notify_all(); } private: -- cgit v1.2.3 From 90bb0614fc1d8f1a185a778e31aaf9027c01aeb8 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 13 Nov 2023 11:44:09 -0500 Subject: Tombstone Compaction: re-enabled tombstone compaction Currently, proactive buffer tombstone compaction is disabled by forcing the buffer tombstone capacity to match its record capacity. It isn't clear how to best handle proactive buffer compactions in an environment where new buffers are spawned anyway. --- include/framework/scheduling/Task.h | 1 + 1 file changed, 1 insertion(+) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index d211fb5..c10ed8b 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -21,6 +21,7 @@ struct MergeArgs { Epoch *epoch; std::vector merges; std::promise result; + bool compaction; void *extension; }; -- cgit v1.2.3 From 3c127eda69295cb306739bdd3c5ddccff6026a8d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 13 Dec 2023 12:39:54 -0500 Subject: Refactoring: corrected a number of names and added more comments --- include/framework/scheduling/Epoch.h | 2 +- include/framework/scheduling/Task.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index fc08d57..4e1b8a2 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -170,7 +170,7 @@ public: * isn't, return true and set a flag indicating that * there is an active merge. */ - bool prepare_merge() { + bool prepare_reconstruction() { auto old = m_active_merge.load(); if (old) { return false; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index c10ed8b..16f5e58 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -17,9 +17,9 @@ namespace de { template -struct MergeArgs { +struct ReconstructionArgs { Epoch *epoch; - std::vector merges; + std::vector merges; std::promise result; bool compaction; void *extension; -- cgit v1.2.3 From aac0bb661af8fae38d3ce08d6078cb4d9dfcb575 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 12 Jan 2024 14:10:11 -0500 Subject: Initial integration of new buffering scheme into framework It isn't working right now (lotsa test failures), but we're to the debugging phase now. --- include/framework/scheduling/Epoch.h | 80 ++++++++---------------------------- 1 file changed, 16 insertions(+), 64 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 4e1b8a2..ca85fe2 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -8,6 +8,9 @@ */ #pragma once +#include +#include + #include "framework/structure/MutableBuffer.h" #include "framework/structure/ExtensionStructure.h" #include "framework/structure/BufferView.h" @@ -20,10 +23,10 @@ class Epoch { private: typedef MutableBuffer Buffer; typedef ExtensionStructure Structure; - typedef BufferView BufView; + typedef BufferView BufView; public: Epoch(size_t number=0) - : m_buffers() + : m_buffer(nullptr) , m_structure(nullptr) , m_active_merge(false) , m_active_jobs(0) @@ -31,8 +34,8 @@ public: , m_epoch_number(number) {} - Epoch(size_t number, Structure *structure, Buffer *buff) - : m_buffers() + Epoch(size_t number, Structure *structure, Buffer *buff) + : m_buffer(buff) , m_structure(structure) , m_active_jobs(0) , m_active_merge(false) @@ -40,8 +43,6 @@ public: , m_epoch_number(number) { structure->take_reference(); - buff->take_reference(); - m_buffers.push_back(buff); } ~Epoch() { @@ -54,35 +55,11 @@ public: */ //m_active_cv.notify_all(); - clear_buffers(); - if (m_structure) { m_structure->release_reference(); } } - Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) { - assert(buf); - - std::unique_lock m_buffer_lock; - /* - * if a current buffer is specified, only add the - * new buffer if the active buffer is the current, - * otherwise just return the active buffer (poor man's - * CAS). - */ - if (cur_buf) { - auto active_buf = get_active_buffer(); - if (active_buf != cur_buf) { - return active_buf; - } - } - - buf->take_reference(); - m_buffers.push_back(buf); - return buf; - } - void start_job() { m_active_jobs.fetch_add(1); } @@ -109,36 +86,10 @@ public: return m_structure; } - std::vector &get_buffers() { - return m_buffers; - } - - BufView get_buffer_view() { - std::unique_lock m_buffer_lock; - return BufView(m_buffers); - } - - Buffer *get_active_buffer() { - if (m_buffers.size() == 0) return nullptr; - - return m_buffers[m_buffers.size() - 1]; + BufView get_buffer() { + return m_buffer->get_buffer_view(); } - /* - * Return the number of buffers in this epoch at - * time of call, and then clear the buffer vector, - * releasing all references in the process. - */ - size_t clear_buffers() { - std::unique_lock m_buffer_lock; - size_t buf_cnt = m_buffers.size(); - for (auto buf : m_buffers) { - if (buf) buf->release_reference(); - } - - m_buffers.clear(); - return buf_cnt; - } /* * Returns a new Epoch object that is a copy of this one. The new object will also contain @@ -148,17 +99,14 @@ public: Epoch *clone(size_t number) { std::unique_lock m_buffer_lock; auto epoch = new Epoch(number); - epoch->m_buffers = m_buffers; + epoch->m_buffer = m_buffer; + if (m_structure) { epoch->m_structure = m_structure->copy(); /* the copy routine returns a structure with 0 references */ epoch->m_structure->take_reference(); } - for (auto b : m_buffers) { - b->take_reference(); - } - return epoch; } @@ -213,9 +161,13 @@ public: return true; } + bool advance_buffer_head(size_t head) { + return m_buffer->advance_head(head); + } + private: Structure *m_structure; - std::vector m_buffers; + Buffer *m_buffer; std::condition_variable m_active_cv; std::mutex m_cv_lock; -- cgit v1.2.3 From cf178ae74a76b780b655a447531d2114f9f81d98 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Mon, 15 Jan 2024 14:01:36 -0500 Subject: Various single-threaded bug fixes --- include/framework/scheduling/Epoch.h | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index ca85fe2..b005ff6 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -60,6 +60,16 @@ public: } } + + /* + * Epochs are *not* copyable or movable. Only one can exist, and all users of + * it work with pointers + */ + Epoch(const Epoch&) = delete; + Epoch(Epoch&&) = delete; + Epoch &operator=(const Epoch&) = delete; + Epoch &operator=(Epoch&&) = delete; + void start_job() { m_active_jobs.fetch_add(1); } @@ -90,6 +100,10 @@ public: return m_buffer->get_buffer_view(); } + BufView get_flush_buffer() { + return m_buffer->get_flush_buffer_view(); + } + /* * Returns a new Epoch object that is a copy of this one. The new object will also contain -- cgit v1.2.3 From 138c793b0a58577713d98c98bb140cf1d9c79bee Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 17 Jan 2024 18:22:00 -0500 Subject: Multiple concurrency bug fixes A poorly organized commit with fixes for a variety of bugs that were causing missing records. The core problems all appear to be fixed, though there is an outstanding problem with tombstones not being completely canceled. A very small number are appearing in the wrong order during the static structure test. --- include/framework/scheduling/Epoch.h | 51 +++++++++++++++++------------------- 1 file changed, 24 insertions(+), 27 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index b005ff6..45ee17d 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -32,15 +32,17 @@ public: , m_active_jobs(0) , m_active(true) , m_epoch_number(number) + , m_buffer_head(0) {} - Epoch(size_t number, Structure *structure, Buffer *buff) + Epoch(size_t number, Structure *structure, Buffer *buff, size_t head) : m_buffer(buff) , m_structure(structure) , m_active_jobs(0) , m_active_merge(false) , m_active(true) , m_epoch_number(number) + , m_buffer_head(head) { structure->take_reference(); } @@ -48,22 +50,21 @@ public: ~Epoch() { assert(m_active_jobs.load() == 0); - /* FIXME: this is needed to keep the destructor from - * sometimes locking up here. But there *shouldn't* be - * any threads waiting on this signal at object destruction, - * so something else is going on here that needs looked into + /* FIXME: this is needed to keep the destructor from sometimes locking + * up here. But there *shouldn't* be any threads waiting on this signal + * at object destruction, so something else is going on here that needs + * looked into */ - //m_active_cv.notify_all(); + // m_active_cv.notify_all(); if (m_structure) { m_structure->release_reference(); } } - - /* - * Epochs are *not* copyable or movable. Only one can exist, and all users of - * it work with pointers + /* + * Epochs are *not* copyable or movable. Only one can exist, and all users + * of it work with pointers */ Epoch(const Epoch&) = delete; Epoch(Epoch&&) = delete; @@ -97,23 +98,20 @@ public: } BufView get_buffer() { - return m_buffer->get_buffer_view(); - } - - BufView get_flush_buffer() { - return m_buffer->get_flush_buffer_view(); + return m_buffer->get_buffer_view(m_buffer_head); } - /* - * Returns a new Epoch object that is a copy of this one. The new object will also contain - * a copy of the m_structure, rather than a reference to the same one. The epoch number of - * the new epoch will be set to the provided argument. + * Returns a new Epoch object that is a copy of this one. The new object + * will also contain a copy of the m_structure, rather than a reference to + * the same one. The epoch number of the new epoch will be set to the + * provided argument. */ Epoch *clone(size_t number) { std::unique_lock m_buffer_lock; auto epoch = new Epoch(number); epoch->m_buffer = m_buffer; + epoch->m_buffer_head = m_buffer_head; if (m_structure) { epoch->m_structure = m_structure->copy(); @@ -125,12 +123,10 @@ public: } /* - * Check if a merge can be started from this Epoch. - * At present, without concurrent merging, this simply - * checks if there is currently a scheduled merge based - * on this Epoch. If there is, returns false. If there - * isn't, return true and set a flag indicating that - * there is an active merge. + * Check if a merge can be started from this Epoch. At present, without + * concurrent merging, this simply checks if there is currently a scheduled + * merge based on this Epoch. If there is, returns false. If there isn't, + * return true and set a flag indicating that there is an active merge. */ bool prepare_reconstruction() { auto old = m_active_merge.load(); @@ -176,7 +172,8 @@ public: } bool advance_buffer_head(size_t head) { - return m_buffer->advance_head(head); + m_buffer_head = head; + return m_buffer->advance_head(m_buffer_head); } private: @@ -187,7 +184,6 @@ private: std::mutex m_cv_lock; std::mutex m_buffer_lock; - std::atomic m_active_merge; /* @@ -199,5 +195,6 @@ private: std::atomic m_active_jobs; bool m_active; size_t m_epoch_number; + size_t m_buffer_head; }; } -- cgit v1.2.3 From 38693c342558628c75e0ab0d23c32a95a499ed8b Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 19 Jan 2024 15:58:04 -0500 Subject: Initial rough-out of internal statistics tracker Need to figure out the best way to do the detailed tracking in a concurrent manner. I was thinking just an event log, with parsing routines for extracting statistics. But that'll be pretty slow. --- include/framework/scheduling/FIFOScheduler.h | 28 ++++---- include/framework/scheduling/SerialScheduler.h | 27 +++----- include/framework/scheduling/Task.h | 23 +++++- include/framework/scheduling/statistics.h | 96 ++++++++++++++++++++++++++ 4 files changed, 143 insertions(+), 31 deletions(-) create mode 100644 include/framework/scheduling/statistics.h (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 4cdc436..513a3a2 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -8,21 +8,11 @@ */ #pragma once -#include -#include -#include #include #include -#include - -#include "util/types.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" -#include "framework/interface/Record.h" -#include "framework/structure/MutableBuffer.h" -#include "framework/util/Configuration.h" -#include "framework/structure/ExtensionStructure.h" + #include "framework/scheduling/Task.h" +#include "framework/scheduling/statistics.h" #include "ctpl/ctpl.h" #include "psu-ds/LockedPriorityQueue.h" @@ -54,10 +44,12 @@ public: m_sched_thrd.join(); } - void schedule_job(std::function job, size_t size, void *args) { + void schedule_job(std::function job, size_t size, void *args, size_t type=0) { std::unique_lock lk(m_cv_lock); size_t ts = m_counter.fetch_add(1); - m_task_queue.push(Task(size, ts, job, args)); + + m_stats.job_queued(ts, type, size); + m_task_queue.push(Task(size, ts, job, args, type, &m_stats)); m_cv.notify_all(); } @@ -68,6 +60,10 @@ public: m_cv.notify_all(); } + void print_statistics() { + m_stats.print_statistics(); + } + private: psudb::LockedPriorityQueue m_task_queue; @@ -86,9 +82,13 @@ private: std::atomic m_used_thrds; std::atomic m_used_memory; + SchedulerStatistics m_stats; + void schedule_next() { assert(m_task_queue.size() > 0); auto t = m_task_queue.pop(); + m_stats.job_scheduled(t.m_timestamp); + m_thrd_pool.push(t); } diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index 10c2af2..ac59301 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -14,21 +14,8 @@ */ #pragma once -#include -#include -#include -#include -#include -#include - -#include "util/types.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" -#include "framework/interface/Record.h" -#include "framework/structure/MutableBuffer.h" -#include "framework/util/Configuration.h" -#include "framework/structure/ExtensionStructure.h" #include "framework/scheduling/Task.h" +#include "framework/scheduling/statistics.h" namespace de { @@ -44,9 +31,11 @@ public: ~SerialScheduler() = default; - void schedule_job(std::function job, size_t size, void *args) { + void schedule_job(std::function job, size_t size, void *args, size_t type=0) { size_t ts = m_counter++; - auto t = Task(size, ts, job, args); + m_stats.job_queued(ts, type, size); + m_stats.job_scheduled(ts); + auto t = Task(size, ts, job, args, type, &m_stats); t(0); } @@ -54,6 +43,10 @@ public: /* intentionally left blank */ } + void print_statistics() { + m_stats.print_statistics(); + } + private: size_t m_memory_budget; size_t m_thrd_cnt; @@ -62,6 +55,8 @@ private: size_t m_used_memory; size_t m_counter; + + SchedulerStatistics m_stats; }; } diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 16f5e58..b14b229 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -10,9 +10,11 @@ #include #include +#include #include "framework/util/Configuration.h" #include "framework/scheduling/Epoch.h" +#include "framework/scheduling/statistics.h" namespace de { @@ -35,17 +37,21 @@ struct QueryArgs { typedef std::function Job; struct Task { - Task(size_t size, size_t ts, Job job, void *args) + Task(size_t size, size_t ts, Job job, void *args, size_t type=0, SchedulerStatistics *stats=nullptr) : m_job(job) , m_size(size) , m_timestamp(ts) , m_args(args) + , m_type(type) + , m_stats(stats) {} Job m_job; size_t m_size; size_t m_timestamp; void *m_args; + size_t m_type; + SchedulerStatistics *m_stats; friend bool operator<(const Task &self, const Task &other) { return self.m_timestamp < other.m_timestamp; @@ -56,7 +62,22 @@ struct Task { } void operator()(size_t thrd_id) { + auto start = std::chrono::high_resolution_clock::now(); + if (m_stats) { + m_stats->job_begin(m_timestamp); + } + m_job(m_args); + + if (m_stats) { + m_stats->job_complete(m_timestamp); + } + auto stop = std::chrono::high_resolution_clock::now(); + + if (m_stats) { + auto time = std::chrono::duration_cast(stop - start).count(); + m_stats->log_time_data(time, m_type); + } } }; diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h new file mode 100644 index 0000000..8466ffc --- /dev/null +++ b/include/framework/scheduling/statistics.h @@ -0,0 +1,96 @@ +/* + * include/framework/scheduling/statistics.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * + * Distributed under the Modified BSD License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace de { + +class SchedulerStatistics { +private: + enum class EventType { + QUEUED, + SCHEDULED, + STARTED, + FINISHED + }; + + struct Event { + size_t id; + EventType type; + }; + + struct JobInfo { + size_t id; + size_t size; + size_t type; + }; + + +public: + SchedulerStatistics() = default; + ~SchedulerStatistics() = default; + + void job_queued(size_t id, size_t type, size_t size) { + auto time = std::chrono::high_resolution_clock::now(); + } + + void job_scheduled(size_t id) { + std::unique_lock lk(m_mutex); + + } + + void job_begin(size_t id) { + + } + + void job_complete(size_t id) { + + } + + /* FIXME: This is just a temporary approach */ + void log_time_data(size_t length, size_t type) { + assert(type == 1 || type == 2); + + if (type == 1) { + m_type_1_cnt.fetch_add(1); + m_type_1_total_time.fetch_add(length); + } else { + m_type_2_cnt.fetch_add(1); + m_type_2_total_time.fetch_add(length); + } + } + + void print_statistics() { + fprintf(stdout, "Query Count: %ld\tQuery Avg. Latency: %ld\n", + m_type_1_cnt.load(), + m_type_1_total_time.load() / m_type_1_cnt.load()); + fprintf(stdout, "Reconstruction Count: %ld\tReconstruction Avg. Latency: %ld\n", + m_type_2_cnt.load(), + m_type_2_total_time.load() / m_type_2_cnt.load()); + } + +private: + std::mutex m_mutex; + std::unordered_map m_jobs; + std::vector m_event_log; + + std::atomic m_type_1_cnt; + std::atomic m_type_1_total_time; + + std::atomic m_type_2_cnt; + std::atomic m_type_2_total_time; +}; +} -- cgit v1.2.3 From 4d0d26bfef684566a371ca7c87ba84df52f25ccc Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 22 Jan 2024 10:42:58 -0500 Subject: FIFOScheduler: added automake wakeup Sometimes, when the max thread count is exceeded, it is possible for the scheduler to lock up. This is because the scheduler is only run when a new job is put into the queue, and so it is possible for a job to be blocked by thread limitations and be left sitting in the queue. If the main program is waiting on this job to finish before scheduling a new one, then the system deadlocks. I added a second background thread to the scheduler that wakes the scheduler up every 20us to resolve this and prevent these deadlocks. --- include/framework/scheduling/FIFOScheduler.h | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 513a3a2..b77a8a1 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -10,7 +10,7 @@ #include #include - +#include #include "framework/scheduling/Task.h" #include "framework/scheduling/statistics.h" @@ -19,6 +19,8 @@ namespace de { +using namespace std::literals::chrono_literals; + class FIFOScheduler { private: @@ -33,6 +35,7 @@ public: , m_shutdown(false) { m_sched_thrd = std::thread(&FIFOScheduler::run, this); + m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this); m_thrd_pool.resize(m_thrd_cnt); } @@ -77,6 +80,7 @@ private: std::condition_variable m_cv; std::thread m_sched_thrd; + std::thread m_sched_wakeup_thrd; ctpl::thread_pool m_thrd_pool; std::atomic m_used_thrds; @@ -84,6 +88,13 @@ private: SchedulerStatistics m_stats; + void periodic_wakeup() { + do { + std::this_thread::sleep_for(10us); + m_cv.notify_all(); + } while (!m_shutdown.load()); + } + void schedule_next() { assert(m_task_queue.size() > 0); auto t = m_task_queue.pop(); -- cgit v1.2.3 From b1e4182825e6c162571b7cc4efaf8bc44055b49c Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 22 Jan 2024 12:10:54 -0500 Subject: Adjusted recon_benchmark and properly shutdown FIFOScheduler --- include/framework/scheduling/FIFOScheduler.h | 1 + 1 file changed, 1 insertion(+) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index b77a8a1..0df4d3c 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -45,6 +45,7 @@ public: } m_sched_thrd.join(); + m_sched_wakeup_thrd.join(); } void schedule_job(std::function job, size_t size, void *args, size_t type=0) { -- cgit v1.2.3 From f24fdf2fd310a5f868e15cd9682ca37d740c77af Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 30 Jan 2024 15:31:03 -0500 Subject: Benchmarking updates --- include/framework/scheduling/statistics.h | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h index 8466ffc..50ba196 100644 --- a/include/framework/scheduling/statistics.h +++ b/include/framework/scheduling/statistics.h @@ -67,19 +67,33 @@ public: if (type == 1) { m_type_1_cnt.fetch_add(1); m_type_1_total_time.fetch_add(length); + + if (length > m_type_1_largest_time) { + m_type_1_largest_time.store(length); + } } else { m_type_2_cnt.fetch_add(1); m_type_2_total_time.fetch_add(length); + + if (length > m_type_2_largest_time) { + m_type_2_largest_time.store(length); + } } } void print_statistics() { - fprintf(stdout, "Query Count: %ld\tQuery Avg. Latency: %ld\n", - m_type_1_cnt.load(), - m_type_1_total_time.load() / m_type_1_cnt.load()); - fprintf(stdout, "Reconstruction Count: %ld\tReconstruction Avg. Latency: %ld\n", - m_type_2_cnt.load(), - m_type_2_total_time.load() / m_type_2_cnt.load()); + if (m_type_1_cnt > 0) { + fprintf(stdout, "Query Count: %ld\tQuery Avg. Latency: %ld\tMax Query Latency: %ld\n", + m_type_1_cnt.load(), + m_type_1_total_time.load() / m_type_1_cnt.load(), + m_type_1_largest_time.load()); + } + if (m_type_2_cnt > 0) { + fprintf(stdout, "Reconstruction Count: %ld\tReconstruction Avg. Latency: %ld\tMax Recon. Latency:%ld\n", + m_type_2_cnt.load(), + m_type_2_total_time.load() / m_type_2_cnt.load(), + m_type_2_largest_time.load()); + } } private: @@ -92,5 +106,8 @@ private: std::atomic m_type_2_cnt; std::atomic m_type_2_total_time; + + std::atomic m_type_1_largest_time; + std::atomic m_type_2_largest_time; }; } -- cgit v1.2.3 From d166465dcca3550cb8f3263e0f5b5189a69d531a Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 31 Jan 2024 13:29:49 -0500 Subject: Temporary thread affinity for reconstruction --- include/framework/scheduling/FIFOScheduler.h | 1 + 1 file changed, 1 insertion(+) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 0df4d3c..c6baf9b 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -114,6 +114,7 @@ private: } } while(!m_shutdown.load()); } + }; } -- cgit v1.2.3 From f3b7428cfa7f9364c5a8bc85107db3a7cccd53bc Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 31 Jan 2024 18:41:17 -0500 Subject: Adjusted epoch transition methodology --- include/framework/scheduling/Epoch.h | 55 ------------------------------------ include/framework/scheduling/Task.h | 2 +- 2 files changed, 1 insertion(+), 56 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 45ee17d..7b533b6 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -29,8 +29,6 @@ public: : m_buffer(nullptr) , m_structure(nullptr) , m_active_merge(false) - , m_active_jobs(0) - , m_active(true) , m_epoch_number(number) , m_buffer_head(0) {} @@ -38,9 +36,7 @@ public: Epoch(size_t number, Structure *structure, Buffer *buff, size_t head) : m_buffer(buff) , m_structure(structure) - , m_active_jobs(0) , m_active_merge(false) - , m_active(true) , m_epoch_number(number) , m_buffer_head(head) { @@ -48,8 +44,6 @@ public: } ~Epoch() { - assert(m_active_jobs.load() == 0); - /* FIXME: this is needed to keep the destructor from sometimes locking * up here. But there *shouldn't* be any threads waiting on this signal * at object destruction, so something else is going on here that needs @@ -71,24 +65,6 @@ public: Epoch &operator=(const Epoch&) = delete; Epoch &operator=(Epoch&&) = delete; - void start_job() { - m_active_jobs.fetch_add(1); - } - - void end_job() { - assert(m_active_jobs.load() > 0); - m_active_jobs.fetch_add(-1); - - if (m_active_jobs.load() == 0) { - std::unique_lock lk(m_cv_lock); - m_active_cv.notify_all(); - } - } - - size_t get_active_job_num() { - return m_active_jobs.load(); - } - size_t get_epoch_number() { return m_epoch_number; } @@ -145,32 +121,6 @@ public: return true; } - void set_inactive() { - m_active = false; - } - - /* - * - */ - bool retirable() { - /* if epoch is currently active, then it cannot be retired */ - if (m_active) { - return false; - } - - /* - * if the epoch has active jobs but is not itself active, - * wait for them to finish and return true. If there are - * not active jobs, return true immediately - */ - std::unique_lock lk(m_cv_lock); - while (m_active_jobs.load() > 0) { - m_active_cv.wait(lk); - } - - return true; - } - bool advance_buffer_head(size_t head) { m_buffer_head = head; return m_buffer->advance_head(m_buffer_head); @@ -180,9 +130,6 @@ private: Structure *m_structure; Buffer *m_buffer; - std::condition_variable m_active_cv; - std::mutex m_cv_lock; - std::mutex m_buffer_lock; std::atomic m_active_merge; @@ -192,8 +139,6 @@ private: * epoch. An epoch can only be retired * when this number is 0. */ - std::atomic m_active_jobs; - bool m_active; size_t m_epoch_number; size_t m_buffer_head; }; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index b14b229..6f6b913 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -29,9 +29,9 @@ struct ReconstructionArgs { template struct QueryArgs { - Epoch *epoch; std::promise> result_set; void *query_parms; + void *extension; }; typedef std::function Job; -- cgit v1.2.3 From 10b4425e842d10b7fbfa85978969ed4591d6b98e Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 7 Feb 2024 10:56:52 -0500 Subject: Fully implemented Query concept and adjusted queries to use it --- include/framework/scheduling/Epoch.h | 2 +- include/framework/scheduling/Task.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 7b533b6..48b7742 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -18,7 +18,7 @@ namespace de { -template +template Q, LayoutPolicy L> class Epoch { private: typedef MutableBuffer Buffer; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 6f6b913..ba0001d 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -18,7 +18,7 @@ namespace de { -template +template Q, LayoutPolicy L> struct ReconstructionArgs { Epoch *epoch; std::vector merges; @@ -27,7 +27,7 @@ struct ReconstructionArgs { void *extension; }; -template +template Q, LayoutPolicy L> struct QueryArgs { std::promise> result_set; void *query_parms; -- cgit v1.2.3 From 2c5d549b3618b9ea72e6eece4cb4f3da5a6811a8 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 7 Feb 2024 13:42:34 -0500 Subject: Fully realized shard concept interface --- include/framework/scheduling/Epoch.h | 2 +- include/framework/scheduling/Task.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 48b7742..e58bd11 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -18,7 +18,7 @@ namespace de { -template Q, LayoutPolicy L> +template S, QueryInterface Q, LayoutPolicy L> class Epoch { private: typedef MutableBuffer Buffer; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index ba0001d..008f232 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -18,7 +18,7 @@ namespace de { -template Q, LayoutPolicy L> +template S, QueryInterface Q, LayoutPolicy L> struct ReconstructionArgs { Epoch *epoch; std::vector merges; @@ -27,7 +27,7 @@ struct ReconstructionArgs { void *extension; }; -template Q, LayoutPolicy L> +template S, QueryInterface Q, LayoutPolicy L> struct QueryArgs { std::promise> result_set; void *query_parms; -- cgit v1.2.3 From 402fc269c0aaa671d84a6d15918735ad4b90e6b2 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 9 Feb 2024 12:30:21 -0500 Subject: Comment updates/fixes --- include/framework/scheduling/Epoch.h | 7 ------- include/framework/scheduling/FIFOScheduler.h | 9 +++++++++ include/framework/scheduling/Task.h | 5 +++++ include/framework/scheduling/statistics.h | 5 +++++ 4 files changed, 19 insertions(+), 7 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index e58bd11..3ffa145 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -44,13 +44,6 @@ public: } ~Epoch() { - /* FIXME: this is needed to keep the destructor from sometimes locking - * up here. But there *shouldn't* be any threads waiting on this signal - * at object destruction, so something else is going on here that needs - * looked into - */ - // m_active_cv.notify_all(); - if (m_structure) { m_structure->release_reference(); } diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index c6baf9b..3ed4f49 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -5,6 +5,15 @@ * * Distributed under the Modified BSD License. * + * This scheduler runs just concurrently, using a standard FIFO queue to + * determine which jobs to run next. If more jobs are scheduled than there + * are available threads, the excess will stall until a thread becomes + * available and then run in the order they were received by the scheduler. + * + * TODO: We need to set up a custom threadpool based on jthreads to support + * thread preemption for a later phase of this project. That will allow us + * to avoid blocking epoch transitions on long-running queries, or to pause + * reconstructions on demand. */ #pragma once diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 008f232..d5d4266 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -5,6 +5,11 @@ * * Distributed under the Modified BSD License. * + * An abstraction to represent a job to be scheduled. Currently the + * supported task types are queries and merges. Based on the current plan, + * simple buffer inserts will likely also be made into a task at some + * point. + * */ #pragma once diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h index 50ba196..6c479cd 100644 --- a/include/framework/scheduling/statistics.h +++ b/include/framework/scheduling/statistics.h @@ -5,6 +5,11 @@ * * Distributed under the Modified BSD License. * + * This is a stub for a statistics tracker to be used in scheduling. It + * currently only tracks simple aggregated statistics, but should be + * updated in the future for more fine-grained statistics. These will be + * used for making scheduling decisions and predicting the runtime of a + * given job. */ #pragma once -- cgit v1.2.3 From 3ddafd3b9ac089252814af87cb7d9fe534cf59a4 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 9 Feb 2024 13:09:05 -0500 Subject: Removed centralized version structure --- include/framework/scheduling/Epoch.h | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 3ffa145..9377fb0 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -47,6 +47,11 @@ public: if (m_structure) { m_structure->release_reference(); } + + if (m_structure->get_reference_count() == 0) { + delete m_structure; + } + } /* -- cgit v1.2.3