From 1a47cbd7978dcad7ed0b2f2af3f933137eedbfa3 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 20 Oct 2023 15:12:46 -0400 Subject: Checkpointing work I'll probably throw all this out, but I want to stash it just in case. --- CMakeLists.txt | 2 +- external/psudb-common | 2 +- include/framework/scheduling/SerialScheduler.h | 65 +++++++++++++------------- include/framework/scheduling/Task.h | 52 +++++++++++++++++++++ 4 files changed, 86 insertions(+), 35 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f642714..c8188a9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(namespace "de") project("Practical Dynamic Extension" VERSION 0.1.0) -set(debug false) +set(debug true) set(tests True) set(bench false) diff --git a/external/psudb-common b/external/psudb-common index b436420..7005ad8 160000 --- a/external/psudb-common +++ b/external/psudb-common @@ -1 +1 @@ -Subproject commit b436420bf4c9a574b3a8e54c1ab46f46e82240af +Subproject commit 7005ad856c941d8485843c53a3b08d53ccc3d98e diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index 5e16bdf..c43e930 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -1,7 +1,7 @@ /* * include/framework/Scheduler.h * - * Copyright (C) 2023 Douglas Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * Dong Xie * * All rights reserved. Published under the Modified BSD License. @@ -14,6 +14,7 @@ #include #include #include +#include #include "util/types.h" #include "framework/interface/Shard.h" @@ -24,6 +25,8 @@ #include "framework/structure/ExtensionStructure.h" #include "framework/scheduling/Task.h" +#include "psu-ds/LockedPriorityQueue.h" + namespace de { template @@ -73,31 +76,21 @@ public: std::vector merges = version->get_merge_tasks(buffer->get_record_count()); /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) + * Schedule the merge tasks */ for (ssize_t i=0; iget_record_count() * sizeof(R) * 2; - buffer_merge.m_timestamp = m_timestamp.fetch_add(1); - buffer_merge.m_type = TaskType::MERGE; - m_merge_queue_lock.lock(); - m_merge_queue.emplace(buffer_merge); - m_merge_queue_lock.unlock(); + auto t = MergeTask(-1, 0, buffer->get_record_count() * sizeof(R) * 2, m_timestamp.fetch_add(1)); + m_task_queue.push(t); m_cv.notify_all(); do { std::unique_lock merge_cv_lock(m_merge_cv_lock); m_merge_cv.wait(merge_cv_lock); - } while (m_merge_queue.size() > 0); + } while (m_task_queue.size() > 0); assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); @@ -128,10 +121,7 @@ private: } void schedule_next_task() { - m_merge_queue_lock.lock(); - auto task = m_merge_queue.top(); - m_merge_queue.pop(); - m_merge_queue_lock.unlock(); + auto task = m_task_queue.pop(); auto type = std::visit(GetTaskType{}, task); @@ -145,7 +135,7 @@ private: default: assert(false); } - if (m_merge_queue.size() == 0) { + if (m_task_queue.size() == 0) { m_merge_cv.notify_all(); } } @@ -157,15 +147,27 @@ private: if (!version->validate_tombstone_proportion(task.m_target_level)) { auto tasks = version->get_merge_tasks(task.m_target_level); /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) + * Schedule the merge tasks */ - for (ssize_t i=tasks.size()-1; i>=0; i--) { + std::promise trigger_prom; + tasks[tasks.size() - 1].make_dependent_on(trigger_prom); + tasks[tasks.size() - 1].m_timestamp = m_timestamp.fetch_add(1); + m_task_queue.push(tasks[tasks.size() - 1]); + + for (ssize_t i=tasks.size()-2; i>=0; i--) { + tasks[i].make_dependent_on(tasks[i+1]); tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(tasks[i]); - m_merge_queue_lock.unlock(); + m_task_queue.push(tasks[i]); } + + /* + * Block the completion of any task until all have been + * scheduled. Probably not strictly necessary, but due to + * interface constraints with the way promises are used, + * a dummy promise needs to be set up for the first job + * anyway. It's easiest to just release it here. + */ + trigger_prom.set_value(); } } @@ -181,9 +183,7 @@ private: */ for (ssize_t i=tasks.size()-1; i>=0; i--) { tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(tasks[i]); - m_merge_queue_lock.unlock(); + m_task_queue.push(tasks[i]); } } } @@ -193,7 +193,7 @@ private: std::unique_lock cv_lock(m_cv_lock); m_cv.wait(cv_lock); - while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { + while (m_task_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { schedule_next_task(); } cv_lock.unlock(); @@ -210,8 +210,7 @@ private: alignas(64) std::atomic m_used_threads; alignas(64) std::atomic m_timestamp; - std::priority_queue, std::greater> m_merge_queue; - std::mutex m_merge_queue_lock; + psudb::LockedPriorityQueue, std::greater> m_task_queue; std::mutex m_cv_lock; std::condition_variable m_cv; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 9e0655a..3c1b158 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -4,6 +4,7 @@ #pragma once #include +#include #include "framework/util/Configuration.h" @@ -14,17 +15,52 @@ enum class TaskType { QUERY }; +struct TaskDependency { + std::promise prom; + std::future fut; +}; + struct MergeTask { level_index m_source_level; level_index m_target_level; size_t m_timestamp; size_t m_size; TaskType m_type; + std::unique_ptr m_dep; + + MergeTask() = default; + + MergeTask(level_index source, level_index target, size_t size, size_t timestamp) + : m_source_level(source) + , m_target_level(target) + , m_timestamp(timestamp) + , m_size(size) + , m_type(TaskType::MERGE) + , m_dep(std::make_unique()){} + + + MergeTask(MergeTask &t) + : m_source_level(t.m_source_level) + , m_target_level(t.m_target_level) + , m_timestamp(t.m_timestamp) + , m_size(t.m_size) + , m_type(TaskType::MERGE) + , m_dep(std::move(t.m_dep)) + {} + TaskType get_type() const { return m_type; } + void make_dependent_on(MergeTask &task) { + m_dep->fut = task.m_dep->prom.get_future(); + } + + void make_dependent_on(TaskDependency *dep) { + m_dep->fut = dep->prom.get_future(); + } + friend bool operator<(const MergeTask &self, const MergeTask &other) { return self.m_timestamp < other.m_timestamp; } @@ -39,11 +75,27 @@ struct QueryTask { size_t m_timestamp; size_t m_size; TaskType m_type; + std::unique_ptr m_dep; + + QueryTask(QueryTask &t) + : m_timestamp(t.m_timestamp) + , m_size(t.m_size) + , m_type(t.m_type) + , m_dep(std::move(t.m_dep)) + {} TaskType get_type() const { return m_type; } + void SetDependency(QueryTask &task) { + m_dep->fut = task.m_dep->prom.get_future(); + } + + void SetDependency(TaskDependency *dep) { + m_dep->fut = dep->prom.get_future(); + } + friend bool operator<(const QueryTask &self, const QueryTask &other) { return self.m_timestamp < other.m_timestamp; } -- cgit v1.2.3