summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h24
-rw-r--r--include/framework/ShardRequirements.h9
-rw-r--r--include/framework/interface/Query.h (renamed from include/framework/QueryInterface.h)0
-rw-r--r--include/framework/interface/Record.h (renamed from include/framework/RecordInterface.h)0
-rw-r--r--include/framework/interface/Scheduler.h31
-rw-r--r--include/framework/interface/Shard.h (renamed from include/framework/ShardInterface.h)2
-rw-r--r--include/framework/scheduling/Scheduler.h (renamed from include/framework/Scheduler.h)15
-rw-r--r--include/framework/scheduling/SerialScheduler.h227
-rw-r--r--include/framework/scheduling/Task.h63
-rw-r--r--include/framework/structure/ExtensionStructure.h (renamed from include/framework/ExtensionStructure.h)29
-rw-r--r--include/framework/structure/InternalLevel.h (renamed from include/framework/InternalLevel.h)8
-rw-r--r--include/framework/structure/MutableBuffer.h (renamed from include/framework/MutableBuffer.h)2
-rw-r--r--include/framework/util/Configuration.h (renamed from include/framework/Configuration.h)0
13 files changed, 365 insertions, 45 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 6965965..3a460aa 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -14,22 +14,24 @@
#include <cstdio>
#include <vector>
-#include "framework/MutableBuffer.h"
-#include "framework/InternalLevel.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
-#include "framework/ExtensionStructure.h"
-
-#include "framework/Configuration.h"
-#include "framework/Scheduler.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/structure/InternalLevel.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Scheduler.h"
+#include "framework/structure/ExtensionStructure.h"
+
+#include "framework/util/Configuration.h"
+#include "framework/scheduling/SerialScheduler.h"
#include "psu-util/timer.h"
#include "psu-ds/Alias.h"
namespace de {
-template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING>
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler<R, S, Q, L>>
class DynamicExtension {
typedef S Shard;
typedef MutableBuffer<R> Buffer;
@@ -219,7 +221,7 @@ public:
}
private:
- Scheduler<R, S, Q, L> m_sched;
+ SCHED m_sched;
std::vector<Buffer *> m_buffers;
std::vector<Structure *> m_versions;
diff --git a/include/framework/ShardRequirements.h b/include/framework/ShardRequirements.h
new file mode 100644
index 0000000..95f7b67
--- /dev/null
+++ b/include/framework/ShardRequirements.h
@@ -0,0 +1,9 @@
+/*
+ *
+ */
+#pragma once
+
+#include "framework/structure/MutableBuffer.h"
+#include "framework/interface/Record.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
diff --git a/include/framework/QueryInterface.h b/include/framework/interface/Query.h
index 46a1ce1..46a1ce1 100644
--- a/include/framework/QueryInterface.h
+++ b/include/framework/interface/Query.h
diff --git a/include/framework/RecordInterface.h b/include/framework/interface/Record.h
index 1ef1984..1ef1984 100644
--- a/include/framework/RecordInterface.h
+++ b/include/framework/interface/Record.h
diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h
new file mode 100644
index 0000000..1445e90
--- /dev/null
+++ b/include/framework/interface/Scheduler.h
@@ -0,0 +1,31 @@
+/*
+ * include/framework/QueryInterface.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <vector>
+#include <concepts>
+#include "framework/interface/Record.h"
+#include "util/types.h"
+
+template <typename S>
+concept SchedulerInterface = requires(S s, size_t i, void *vp) {
+ {S(i, i)};
+// {s.schedule_merge(vp, vp)};
+
+/*
+ {q.get_query_state(p, p)} -> std::convertible_to<void*>;
+ {q.get_buffer_query_state(p, p)};
+ {q.query(p, p)};
+ {q.buffer_query(p, p)};
+ {q.merge()};
+ {q.delete_query_state(p)};
+*/
+ //{Q::get_query_state(p, p)} -> std::convertible_to<void*>;
+ //{Q::get_buffer_query_state(p, p)} -> std::convertible_to<void*>;
+};
diff --git a/include/framework/ShardInterface.h b/include/framework/interface/Shard.h
index 3aa62df..ea58b2a 100644
--- a/include/framework/ShardInterface.h
+++ b/include/framework/interface/Shard.h
@@ -11,7 +11,7 @@
#include <concepts>
#include "util/types.h"
-#include "framework/RecordInterface.h"
+#include "framework/interface/Record.h"
namespace de {
diff --git a/include/framework/Scheduler.h b/include/framework/scheduling/Scheduler.h
index 6055bef..992cbf9 100644
--- a/include/framework/Scheduler.h
+++ b/include/framework/scheduling/Scheduler.h
@@ -16,12 +16,12 @@
#include <condition_variable>
#include "util/types.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
-#include "framework/MutableBuffer.h"
-#include "framework/Configuration.h"
-#include "framework/ExtensionStructure.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/util/Configuration.h"
+#include "framework/structure/ExtensionStructure.h"
namespace de {
@@ -118,6 +118,7 @@ private:
}
}
+
void run_merge(MergeTask task, Structure *version) {
version->merge_levels(task.m_target_level, task.m_source_level);
@@ -160,7 +161,7 @@ private:
std::unique_lock<std::mutex> cv_lock(m_cv_lock);
m_cv.wait(cv_lock);
- while (m_merge_queue.size() > 0 && m_used_threads < m_thread_cnt) {
+ while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) {
schedule_next_task();
}
cv_lock.unlock();
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
new file mode 100644
index 0000000..5e16bdf
--- /dev/null
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -0,0 +1,227 @@
+/*
+ * include/framework/Scheduler.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <vector>
+#include <memory>
+#include <queue>
+#include <thread>
+#include <condition_variable>
+
+#include "util/types.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/util/Configuration.h"
+#include "framework/structure/ExtensionStructure.h"
+#include "framework/scheduling/Task.h"
+
+namespace de {
+
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
+class SerialScheduler {
+ typedef ExtensionStructure<R, S, Q, L> Structure;
+ typedef MutableBuffer<R> Buffer;
+public:
+ /*
+ * A simple "scheduler" that runs tasks serially, in a FIFO manner. Incoming concurrent
+ * requests will wait for their turn, and only one task will be active in the system at
+ * a time. The scheduler will spin up a second thread for running itself, but all tasks
+ * will be single-threaded.
+ *
+ * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means
+ * unlimited.
+ *
+ * Note that the SerialScheduler object is non-concurrent, and so will ignore the
+ * thread_cnt argument. It will obey the memory_budget, however a failure due to
+ * memory constraints will be irrecoverable, as there is no way to free up memory
+ * or block particular tasks until memory becomes available.
+ */
+ SerialScheduler(size_t memory_budget, size_t thread_cnt)
+ : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
+ , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX)
+ , m_used_memory(0)
+ , m_used_threads(0)
+ , m_shutdown(false)
+ {
+ m_sched_thrd = std::thread(&SerialScheduler::run_scheduler, this);
+ }
+
+ ~SerialScheduler() {
+ m_shutdown = true;
+
+ m_cv.notify_all();
+ m_sched_thrd.join();
+ }
+
+ bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) {
+ pending_version = version;
+ pending_buffer = buffer;
+
+ /*
+ * Get list of individual level reconstructions that are necessary
+ * for completing the overall merge
+ */
+ std::vector<MergeTask> merges = version->get_merge_tasks(buffer->get_record_count());
+
+ /*
+ * Schedule the merge tasks (FIXME: currently this just
+ * executes them sequentially in a blocking fashion)
+ */
+ for (ssize_t i=0; i<merges.size(); i++) {
+ merges[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.emplace(merges[i]);
+ m_merge_queue_lock.unlock();
+ }
+
+ MergeTask buffer_merge;
+ buffer_merge.m_source_level = -1;
+ buffer_merge.m_target_level = 0;
+ buffer_merge.m_size = buffer->get_record_count() * sizeof(R) * 2;
+ buffer_merge.m_timestamp = m_timestamp.fetch_add(1);
+ buffer_merge.m_type = TaskType::MERGE;
+ m_merge_queue_lock.lock();
+ m_merge_queue.emplace(buffer_merge);
+ m_merge_queue_lock.unlock();
+
+ m_cv.notify_all();
+ do {
+ std::unique_lock<std::mutex> merge_cv_lock(m_merge_cv_lock);
+ m_merge_cv.wait(merge_cv_lock);
+ } while (m_merge_queue.size() > 0);
+
+ assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0);
+
+ return true;
+ }
+
+ bool schedule_query() {
+ return true;
+ }
+
+private:
+ size_t get_timestamp() {
+ auto ts = m_timestamp.fetch_add(1);
+ return ts;
+ }
+
+ void schedule_merge(MergeTask task) {
+ if (task.m_source_level == -1 && task.m_target_level == 0) {
+ run_buffer_merge(pending_buffer, pending_version);
+ } else {
+ run_merge(task, pending_version);
+ }
+ }
+
+
+ void schedule_query(QueryTask task) {
+
+ }
+
+ void schedule_next_task() {
+ m_merge_queue_lock.lock();
+ auto task = m_merge_queue.top();
+ m_merge_queue.pop();
+ m_merge_queue_lock.unlock();
+
+ auto type = std::visit(GetTaskType{}, task);
+
+ switch (type) {
+ case TaskType::MERGE:
+ schedule_merge(std::get<MergeTask>(task));
+ break;
+ case TaskType::QUERY:
+ schedule_query(std::get<QueryTask>(task));
+ break;
+ default: assert(false);
+ }
+
+ if (m_merge_queue.size() == 0) {
+ m_merge_cv.notify_all();
+ }
+ }
+
+
+ void run_merge(MergeTask task, Structure *version) {
+ version->merge_levels(task.m_target_level, task.m_source_level);
+
+ if (!version->validate_tombstone_proportion(task.m_target_level)) {
+ auto tasks = version->get_merge_tasks(task.m_target_level);
+ /*
+ * Schedule the merge tasks (FIXME: currently this just
+ * executes them sequentially in a blocking fashion)
+ */
+ for (ssize_t i=tasks.size()-1; i>=0; i--) {
+ tasks[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.push(tasks[i]);
+ m_merge_queue_lock.unlock();
+ }
+ }
+ }
+
+
+ void run_buffer_merge(Buffer *buffer, Structure *version) {
+ version->merge_buffer(buffer);
+ if (!version->validate_tombstone_proportion(0)) {
+ auto tasks = version->get_merge_tasks_from_level(0);
+
+ /*
+ * Schedule the merge tasks (FIXME: currently this just
+ * executes them sequentially in a blocking fashion)
+ */
+ for (ssize_t i=tasks.size()-1; i>=0; i--) {
+ tasks[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.push(tasks[i]);
+ m_merge_queue_lock.unlock();
+ }
+ }
+ }
+
+ void run_scheduler() {
+ do {
+ std::unique_lock<std::mutex> cv_lock(m_cv_lock);
+ m_cv.wait(cv_lock);
+
+ while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) {
+ schedule_next_task();
+ }
+ cv_lock.unlock();
+ } while(!m_shutdown);
+ }
+
+ size_t m_memory_budget;
+ size_t m_thread_cnt;
+
+ Buffer *pending_buffer;
+ Structure *pending_version;
+
+ alignas(64) std::atomic<size_t> m_used_memory;
+ alignas(64) std::atomic<size_t> m_used_threads;
+ alignas(64) std::atomic<size_t> m_timestamp;
+
+ std::priority_queue<Task, std::vector<Task>, std::greater<Task>> m_merge_queue;
+ std::mutex m_merge_queue_lock;
+
+ std::mutex m_cv_lock;
+ std::condition_variable m_cv;
+
+ std::mutex m_merge_cv_lock;
+ std::condition_variable m_merge_cv;
+
+ std::thread m_sched_thrd;
+
+ bool m_shutdown;
+};
+
+}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
new file mode 100644
index 0000000..9e0655a
--- /dev/null
+++ b/include/framework/scheduling/Task.h
@@ -0,0 +1,63 @@
+/*
+ *
+ */
+#pragma once
+
+#include <variant>
+
+#include "framework/util/Configuration.h"
+
+namespace de {
+
+enum class TaskType {
+ MERGE,
+ QUERY
+};
+
+struct MergeTask {
+ level_index m_source_level;
+ level_index m_target_level;
+ size_t m_timestamp;
+ size_t m_size;
+ TaskType m_type;
+
+ TaskType get_type() const {
+ return m_type;
+ }
+
+ friend bool operator<(const MergeTask &self, const MergeTask &other) {
+ return self.m_timestamp < other.m_timestamp;
+ }
+
+ friend bool operator>(const MergeTask &self, const MergeTask &other) {
+ return self.m_timestamp > other.m_timestamp;
+ }
+
+};
+
+struct QueryTask {
+ size_t m_timestamp;
+ size_t m_size;
+ TaskType m_type;
+
+ TaskType get_type() const {
+ return m_type;
+ }
+
+ friend bool operator<(const QueryTask &self, const QueryTask &other) {
+ return self.m_timestamp < other.m_timestamp;
+ }
+
+ friend bool operator>(const QueryTask &self, const QueryTask &other) {
+ return self.m_timestamp > other.m_timestamp;
+ }
+};
+
+struct GetTaskType {
+ TaskType operator()(const MergeTask &t) { return t.get_type(); }
+ TaskType operator()(const QueryTask &t) { return t.get_type(); }
+};
+
+typedef std::variant<MergeTask, QueryTask> Task;
+
+}
diff --git a/include/framework/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 892e63b..920e1c3 100644
--- a/include/framework/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -14,34 +14,20 @@
#include <cstdio>
#include <vector>
-#include "framework/MutableBuffer.h"
-#include "framework/InternalLevel.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/structure/InternalLevel.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
-#include "framework/Configuration.h"
+#include "framework/util/Configuration.h"
+#include "framework/scheduling/Task.h"
#include "psu-util/timer.h"
#include "psu-ds/Alias.h"
namespace de {
-struct MergeTask {
- level_index m_source_level;
- level_index m_target_level;
- size_t m_size;
- size_t m_timestamp;
-
- friend bool operator<(const MergeTask &self, const MergeTask &other) {
- return self.m_timestamp < other.m_timestamp;
- }
-
- friend bool operator>(const MergeTask &self, const MergeTask &other) {
- return self.m_timestamp > other.m_timestamp;
- }
-};
-
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING>
class ExtensionStructure {
typedef S Shard;
@@ -233,6 +219,7 @@ public:
MergeTask task;
task.m_source_level = i - 1;
task.m_target_level = i;
+ task.m_type = TaskType::MERGE;
/*
* The amount of storage required for the merge accounts
diff --git a/include/framework/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 6cdac4e..b9230f4 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -13,10 +13,10 @@
#include <memory>
#include "util/types.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
-#include "framework/MutableBuffer.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/MutableBuffer.h"
namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q>
diff --git a/include/framework/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 572b656..9f12175 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -22,7 +22,7 @@
#include "psu-ds/BloomFilter.h"
#include "psu-ds/Alias.h"
#include "psu-util/timer.h"
-#include "framework/RecordInterface.h"
+#include "framework/interface/Record.h"
using psudb::CACHELINE_SIZE;
diff --git a/include/framework/Configuration.h b/include/framework/util/Configuration.h
index eb9b93f..eb9b93f 100644
--- a/include/framework/Configuration.h
+++ b/include/framework/util/Configuration.h