summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-10-20 15:12:46 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-10-20 15:12:46 -0400
commit1a47cbd7978dcad7ed0b2f2af3f933137eedbfa3 (patch)
tree0dc9c3aa484fb3eb49b9a6491dd705afb176aa0e
parent7c03d771475421c1d5a2bbc135242536af1a371c (diff)
downloaddynamic-extension-1a47cbd7978dcad7ed0b2f2af3f933137eedbfa3.tar.gz
Checkpointing work
I'll probably throw all this out, but I want to stash it just in case.
-rw-r--r--CMakeLists.txt2
m---------external/psudb-common0
-rw-r--r--include/framework/scheduling/SerialScheduler.h65
-rw-r--r--include/framework/scheduling/Task.h52
4 files changed, 85 insertions, 34 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f642714..c8188a9 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
set(namespace "de")
project("Practical Dynamic Extension" VERSION 0.1.0)
-set(debug false)
+set(debug true)
set(tests True)
set(bench false)
diff --git a/external/psudb-common b/external/psudb-common
-Subproject b436420bf4c9a574b3a8e54c1ab46f46e82240a
+Subproject 7005ad856c941d8485843c53a3b08d53ccc3d98
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
index 5e16bdf..c43e930 100644
--- a/include/framework/scheduling/SerialScheduler.h
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -1,7 +1,7 @@
/*
* include/framework/Scheduler.h
*
- * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
*
* All rights reserved. Published under the Modified BSD License.
@@ -14,6 +14,7 @@
#include <queue>
#include <thread>
#include <condition_variable>
+#include <future>
#include "util/types.h"
#include "framework/interface/Shard.h"
@@ -24,6 +25,8 @@
#include "framework/structure/ExtensionStructure.h"
#include "framework/scheduling/Task.h"
+#include "psu-ds/LockedPriorityQueue.h"
+
namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
@@ -73,31 +76,21 @@ public:
std::vector<MergeTask> merges = version->get_merge_tasks(buffer->get_record_count());
/*
- * Schedule the merge tasks (FIXME: currently this just
- * executes them sequentially in a blocking fashion)
+ * Schedule the merge tasks
*/
for (ssize_t i=0; i<merges.size(); i++) {
merges[i].m_timestamp = m_timestamp.fetch_add(1);
- m_merge_queue_lock.lock();
- m_merge_queue.emplace(merges[i]);
- m_merge_queue_lock.unlock();
+ m_task_queue.push(merges[i]);
}
- MergeTask buffer_merge;
- buffer_merge.m_source_level = -1;
- buffer_merge.m_target_level = 0;
- buffer_merge.m_size = buffer->get_record_count() * sizeof(R) * 2;
- buffer_merge.m_timestamp = m_timestamp.fetch_add(1);
- buffer_merge.m_type = TaskType::MERGE;
- m_merge_queue_lock.lock();
- m_merge_queue.emplace(buffer_merge);
- m_merge_queue_lock.unlock();
+ auto t = MergeTask(-1, 0, buffer->get_record_count() * sizeof(R) * 2, m_timestamp.fetch_add(1));
+ m_task_queue.push(t);
m_cv.notify_all();
do {
std::unique_lock<std::mutex> merge_cv_lock(m_merge_cv_lock);
m_merge_cv.wait(merge_cv_lock);
- } while (m_merge_queue.size() > 0);
+ } while (m_task_queue.size() > 0);
assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0);
@@ -128,10 +121,7 @@ private:
}
void schedule_next_task() {
- m_merge_queue_lock.lock();
- auto task = m_merge_queue.top();
- m_merge_queue.pop();
- m_merge_queue_lock.unlock();
+ auto task = m_task_queue.pop();
auto type = std::visit(GetTaskType{}, task);
@@ -145,7 +135,7 @@ private:
default: assert(false);
}
- if (m_merge_queue.size() == 0) {
+ if (m_task_queue.size() == 0) {
m_merge_cv.notify_all();
}
}
@@ -157,15 +147,27 @@ private:
if (!version->validate_tombstone_proportion(task.m_target_level)) {
auto tasks = version->get_merge_tasks(task.m_target_level);
/*
- * Schedule the merge tasks (FIXME: currently this just
- * executes them sequentially in a blocking fashion)
+ * Schedule the merge tasks
*/
- for (ssize_t i=tasks.size()-1; i>=0; i--) {
+ std::promise<void> trigger_prom;
+ tasks[tasks.size() - 1].make_dependent_on(trigger_prom);
+ tasks[tasks.size() - 1].m_timestamp = m_timestamp.fetch_add(1);
+ m_task_queue.push(tasks[tasks.size() - 1]);
+
+ for (ssize_t i=tasks.size()-2; i>=0; i--) {
+ tasks[i].make_dependent_on(tasks[i+1]);
tasks[i].m_timestamp = m_timestamp.fetch_add(1);
- m_merge_queue_lock.lock();
- m_merge_queue.push(tasks[i]);
- m_merge_queue_lock.unlock();
+ m_task_queue.push(tasks[i]);
}
+
+ /*
+ * Block the completion of any task until all have been
+ * scheduled. Probably not strictly necessary, but due to
+ * interface constraints with the way promises are used,
+ * a dummy promise needs to be set up for the first job
+ * anyway. It's easiest to just release it here.
+ */
+ trigger_prom.set_value();
}
}
@@ -181,9 +183,7 @@ private:
*/
for (ssize_t i=tasks.size()-1; i>=0; i--) {
tasks[i].m_timestamp = m_timestamp.fetch_add(1);
- m_merge_queue_lock.lock();
- m_merge_queue.push(tasks[i]);
- m_merge_queue_lock.unlock();
+ m_task_queue.push(tasks[i]);
}
}
}
@@ -193,7 +193,7 @@ private:
std::unique_lock<std::mutex> cv_lock(m_cv_lock);
m_cv.wait(cv_lock);
- while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) {
+ while (m_task_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) {
schedule_next_task();
}
cv_lock.unlock();
@@ -210,8 +210,7 @@ private:
alignas(64) std::atomic<size_t> m_used_threads;
alignas(64) std::atomic<size_t> m_timestamp;
- std::priority_queue<Task, std::vector<Task>, std::greater<Task>> m_merge_queue;
- std::mutex m_merge_queue_lock;
+ psudb::LockedPriorityQueue<Task, std::vector<Task>, std::greater<Task>> m_task_queue;
std::mutex m_cv_lock;
std::condition_variable m_cv;
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 9e0655a..3c1b158 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -4,6 +4,7 @@
#pragma once
#include <variant>
+#include <future>
#include "framework/util/Configuration.h"
@@ -14,17 +15,52 @@ enum class TaskType {
QUERY
};
+struct TaskDependency {
+ std::promise<void> prom;
+ std::future<void> fut;
+};
+
struct MergeTask {
level_index m_source_level;
level_index m_target_level;
size_t m_timestamp;
size_t m_size;
TaskType m_type;
+ std::unique_ptr<TaskDependency> m_dep;
+
+ MergeTask() = default;
+
+ MergeTask(level_index source, level_index target, size_t size, size_t timestamp)
+ : m_source_level(source)
+ , m_target_level(target)
+ , m_timestamp(timestamp)
+ , m_size(size)
+ , m_type(TaskType::MERGE)
+ , m_dep(std::make_unique<TaskDependency>()){}
+
+
+ MergeTask(MergeTask &t)
+ : m_source_level(t.m_source_level)
+ , m_target_level(t.m_target_level)
+ , m_timestamp(t.m_timestamp)
+ , m_size(t.m_size)
+ , m_type(TaskType::MERGE)
+ , m_dep(std::move(t.m_dep))
+ {}
+
TaskType get_type() const {
return m_type;
}
+ void make_dependent_on(MergeTask &task) {
+ m_dep->fut = task.m_dep->prom.get_future();
+ }
+
+ void make_dependent_on(TaskDependency *dep) {
+ m_dep->fut = dep->prom.get_future();
+ }
+
friend bool operator<(const MergeTask &self, const MergeTask &other) {
return self.m_timestamp < other.m_timestamp;
}
@@ -39,11 +75,27 @@ struct QueryTask {
size_t m_timestamp;
size_t m_size;
TaskType m_type;
+ std::unique_ptr<TaskDependency> m_dep;
+
+ QueryTask(QueryTask &t)
+ : m_timestamp(t.m_timestamp)
+ , m_size(t.m_size)
+ , m_type(t.m_type)
+ , m_dep(std::move(t.m_dep))
+ {}
TaskType get_type() const {
return m_type;
}
+ void SetDependency(QueryTask &task) {
+ m_dep->fut = task.m_dep->prom.get_future();
+ }
+
+ void SetDependency(TaskDependency *dep) {
+ m_dep->fut = dep->prom.get_future();
+ }
+
friend bool operator<(const QueryTask &self, const QueryTask &other) {
return self.m_timestamp < other.m_timestamp;
}