diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-10-20 15:12:46 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-10-20 15:12:46 -0400 |
| commit | 1a47cbd7978dcad7ed0b2f2af3f933137eedbfa3 (patch) | |
| tree | 0dc9c3aa484fb3eb49b9a6491dd705afb176aa0e | |
| parent | 7c03d771475421c1d5a2bbc135242536af1a371c (diff) | |
| download | dynamic-extension-1a47cbd7978dcad7ed0b2f2af3f933137eedbfa3.tar.gz | |
Checkpointing work
I'll probably throw all this out, but I want to stash it just in case.
| -rw-r--r-- | CMakeLists.txt | 2 | ||||
| m--------- | external/psudb-common | 0 | ||||
| -rw-r--r-- | include/framework/scheduling/SerialScheduler.h | 65 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 52 |
4 files changed, 85 insertions, 34 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index f642714..c8188a9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(namespace "de") project("Practical Dynamic Extension" VERSION 0.1.0) -set(debug false) +set(debug true) set(tests True) set(bench false) diff --git a/external/psudb-common b/external/psudb-common -Subproject b436420bf4c9a574b3a8e54c1ab46f46e82240a +Subproject 7005ad856c941d8485843c53a3b08d53ccc3d98 diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index 5e16bdf..c43e930 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -1,7 +1,7 @@ /* * include/framework/Scheduler.h * - * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> * Dong Xie <dongx@psu.edu> * * All rights reserved. Published under the Modified BSD License. @@ -14,6 +14,7 @@ #include <queue> #include <thread> #include <condition_variable> +#include <future> #include "util/types.h" #include "framework/interface/Shard.h" @@ -24,6 +25,8 @@ #include "framework/structure/ExtensionStructure.h" #include "framework/scheduling/Task.h" +#include "psu-ds/LockedPriorityQueue.h" + namespace de { template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L> @@ -73,31 +76,21 @@ public: std::vector<MergeTask> merges = version->get_merge_tasks(buffer->get_record_count()); /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) + * Schedule the merge tasks */ for (ssize_t i=0; i<merges.size(); i++) { merges[i].m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.emplace(merges[i]); - m_merge_queue_lock.unlock(); + m_task_queue.push(merges[i]); } - MergeTask buffer_merge; - buffer_merge.m_source_level = -1; - buffer_merge.m_target_level = 0; - buffer_merge.m_size = buffer->get_record_count() * sizeof(R) * 2; - buffer_merge.m_timestamp = m_timestamp.fetch_add(1); - buffer_merge.m_type = TaskType::MERGE; - m_merge_queue_lock.lock(); - m_merge_queue.emplace(buffer_merge); - m_merge_queue_lock.unlock(); + auto t = MergeTask(-1, 0, buffer->get_record_count() * sizeof(R) * 2, m_timestamp.fetch_add(1)); + m_task_queue.push(t); m_cv.notify_all(); do { std::unique_lock<std::mutex> merge_cv_lock(m_merge_cv_lock); m_merge_cv.wait(merge_cv_lock); - } while (m_merge_queue.size() > 0); + } while (m_task_queue.size() > 0); assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); @@ -128,10 +121,7 @@ private: } void schedule_next_task() { - m_merge_queue_lock.lock(); - auto task = m_merge_queue.top(); - m_merge_queue.pop(); - m_merge_queue_lock.unlock(); + auto task = m_task_queue.pop(); auto type = std::visit(GetTaskType{}, task); @@ -145,7 +135,7 @@ private: default: assert(false); } - if (m_merge_queue.size() == 0) { + if (m_task_queue.size() == 0) { m_merge_cv.notify_all(); } } @@ -157,15 +147,27 @@ private: if (!version->validate_tombstone_proportion(task.m_target_level)) { auto tasks = version->get_merge_tasks(task.m_target_level); /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) + * Schedule the merge tasks */ - for (ssize_t i=tasks.size()-1; i>=0; i--) { + std::promise<void> trigger_prom; + tasks[tasks.size() - 1].make_dependent_on(trigger_prom); + tasks[tasks.size() - 1].m_timestamp = m_timestamp.fetch_add(1); + m_task_queue.push(tasks[tasks.size() - 1]); + + for (ssize_t i=tasks.size()-2; i>=0; i--) { + tasks[i].make_dependent_on(tasks[i+1]); tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(tasks[i]); - m_merge_queue_lock.unlock(); + m_task_queue.push(tasks[i]); } + + /* + * Block the completion of any task until all have been + * scheduled. Probably not strictly necessary, but due to + * interface constraints with the way promises are used, + * a dummy promise needs to be set up for the first job + * anyway. It's easiest to just release it here. + */ + trigger_prom.set_value(); } } @@ -181,9 +183,7 @@ private: */ for (ssize_t i=tasks.size()-1; i>=0; i--) { tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(tasks[i]); - m_merge_queue_lock.unlock(); + m_task_queue.push(tasks[i]); } } } @@ -193,7 +193,7 @@ private: std::unique_lock<std::mutex> cv_lock(m_cv_lock); m_cv.wait(cv_lock); - while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { + while (m_task_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { schedule_next_task(); } cv_lock.unlock(); @@ -210,8 +210,7 @@ private: alignas(64) std::atomic<size_t> m_used_threads; alignas(64) std::atomic<size_t> m_timestamp; - std::priority_queue<Task, std::vector<Task>, std::greater<Task>> m_merge_queue; - std::mutex m_merge_queue_lock; + psudb::LockedPriorityQueue<Task, std::vector<Task>, std::greater<Task>> m_task_queue; std::mutex m_cv_lock; std::condition_variable m_cv; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 9e0655a..3c1b158 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -4,6 +4,7 @@ #pragma once #include <variant> +#include <future> #include "framework/util/Configuration.h" @@ -14,17 +15,52 @@ enum class TaskType { QUERY }; +struct TaskDependency { + std::promise<void> prom; + std::future<void> fut; +}; + struct MergeTask { level_index m_source_level; level_index m_target_level; size_t m_timestamp; size_t m_size; TaskType m_type; + std::unique_ptr<TaskDependency> m_dep; + + MergeTask() = default; + + MergeTask(level_index source, level_index target, size_t size, size_t timestamp) + : m_source_level(source) + , m_target_level(target) + , m_timestamp(timestamp) + , m_size(size) + , m_type(TaskType::MERGE) + , m_dep(std::make_unique<TaskDependency>()){} + + + MergeTask(MergeTask &t) + : m_source_level(t.m_source_level) + , m_target_level(t.m_target_level) + , m_timestamp(t.m_timestamp) + , m_size(t.m_size) + , m_type(TaskType::MERGE) + , m_dep(std::move(t.m_dep)) + {} + TaskType get_type() const { return m_type; } + void make_dependent_on(MergeTask &task) { + m_dep->fut = task.m_dep->prom.get_future(); + } + + void make_dependent_on(TaskDependency *dep) { + m_dep->fut = dep->prom.get_future(); + } + friend bool operator<(const MergeTask &self, const MergeTask &other) { return self.m_timestamp < other.m_timestamp; } @@ -39,11 +75,27 @@ struct QueryTask { size_t m_timestamp; size_t m_size; TaskType m_type; + std::unique_ptr<TaskDependency> m_dep; + + QueryTask(QueryTask &t) + : m_timestamp(t.m_timestamp) + , m_size(t.m_size) + , m_type(t.m_type) + , m_dep(std::move(t.m_dep)) + {} TaskType get_type() const { return m_type; } + void SetDependency(QueryTask &task) { + m_dep->fut = task.m_dep->prom.get_future(); + } + + void SetDependency(TaskDependency *dep) { + m_dep->fut = dep->prom.get_future(); + } + friend bool operator<(const QueryTask &self, const QueryTask &other) { return self.m_timestamp < other.m_timestamp; } |