summaryrefslogtreecommitdiffstats
path: root/include/framework/Scheduler.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-09-13 16:22:03 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-09-13 16:22:03 -0400
commiteb8dbaa770a57557d67c817c2839c64f536a6ce4 (patch)
tree77bbbb79fb70f79965e7f6fd75bb5f4799a6f120 /include/framework/Scheduler.h
parent076e104b8672924c3d80cd1da2fdb5ebee1766ac (diff)
downloaddynamic-extension-eb8dbaa770a57557d67c817c2839c64f536a6ce4.tar.gz
Began re-architecting the project for concurrency support
The project is now in a state where it builds, but it probably has a lot of bugs still.
Diffstat (limited to 'include/framework/Scheduler.h')
-rw-r--r--include/framework/Scheduler.h76
1 files changed, 76 insertions, 0 deletions
diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h
new file mode 100644
index 0000000..cd3f430
--- /dev/null
+++ b/include/framework/Scheduler.h
@@ -0,0 +1,76 @@
+/*
+ * include/framework/Scheduler.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <vector>
+#include <memory>
+#include <queue>
+
+#include "util/types.h"
+#include "framework/ShardInterface.h"
+#include "framework/QueryInterface.h"
+#include "framework/RecordInterface.h"
+#include "framework/MutableBuffer.h"
+#include "framework/Configuration.h"
+#include "framework/ExtensionStructure.h"
+
+namespace de {
+
+
+struct MergeTask {
+ level_index m_source_level;
+ level_index m_target_level;
+ size_t m_size;
+ size_t m_timestamp;
+
+ bool operator<(MergeTask &other) {
+ return m_timestamp < other.m_timestamp;
+ }
+};
+
+
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
+class Scheduler {
+ typedef ExtensionStructure<R, S, Q, L> Structure;
+public:
+ /*
+ * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means
+ * unlimited.
+ */
+ Scheduler(size_t memory_budget, size_t thread_cnt)
+ : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
+ , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX)
+ , m_used_memory(0)
+ , m_used_threads(0)
+ {}
+
+ bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) {
+ // FIXME: this is a non-concurrent implementation
+ return version->merge_buffer(buffer);
+ }
+
+private:
+ size_t get_timestamp() {
+ auto ts = m_timestamp.fetch_add(1);
+ return ts;
+ }
+
+ size_t m_memory_budget;
+ size_t m_thread_cnt;
+
+ alignas(64) std::atomic<size_t> m_used_memory;
+ alignas(64) std::atomic<size_t> m_used_threads;
+ alignas(64) std::atomic<size_t> m_timestamp;
+
+ std::priority_queue<MergeTask> m_merge_queue;
+ std::mutex m_merge_queue_lock;
+};
+
+}