summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2023-10-31 11:53:08 -0400
committerDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2023-10-31 11:53:08 -0400
commit7163b8db0ee5acc099a228090a4bdee379c1c8af (patch)
tree3697c22f3decee5aa9449ddc9c6f1562f858724b
parent1b8bec5ea882584aba62c92d1ab6ffaf03e7b9b5 (diff)
downloaddynamic-extension-7163b8db0ee5acc099a228090a4bdee379c1c8af.tar.gz
SerialScheduler: added a single-threaded scheduler
Added a new scheduler for ensuring single-threaded operation. Additionally, added a static assert to (at least for now) restrict the use of tagging to this single threaded scheduler.
-rw-r--r--include/framework/DynamicExtension.h2
-rw-r--r--include/framework/scheduling/SerialScheduler.h67
2 files changed, 69 insertions, 0 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index a1f7c2b..3a4a7e1 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -26,6 +26,7 @@
#include "framework/util/Configuration.h"
#include "framework/scheduling/FIFOScheduler.h"
+#include "framework/scheduling/SerialScheduler.h"
#include "framework/scheduling/Epoch.h"
#include "psu-util/timer.h"
@@ -82,6 +83,7 @@ public:
// FIXME: delete tagging will require a lot of extra work to get
// operating "correctly" in a concurrent environment.
if constexpr (D == DeletePolicy::TAGGING) {
+ static_assert(std::same_as<SCHED, SerialScheduler>, "Tagging is only supported in single-threaded operation");
BufView buffers = get_active_epoch()->get_buffer_view();
if (get_active_epoch()->get_structure()->tagged_delete(rec)) {
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
new file mode 100644
index 0000000..9c767e8
--- /dev/null
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -0,0 +1,67 @@
+/*
+ * include/framework/scheduling/SerialScheduler.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ * IMPORTANT: This "scheduler" is a shim implementation for allowing
+ * strictly serial, single-threaded operation of the framework. It should
+ * never be used in multi-threaded contexts. A call to the schedule_job
+ * function will immediately run the job and block on its completion before
+ * returning.
+ *
+ */
+#pragma once
+
+#include <vector>
+#include <memory>
+#include <queue>
+#include <thread>
+#include <condition_variable>
+#include <future>
+
+#include "util/types.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/util/Configuration.h"
+#include "framework/structure/ExtensionStructure.h"
+#include "framework/scheduling/Task.h"
+
+namespace de {
+
+class SerialScheduler {
+public:
+ SerialScheduler(size_t memory_budget, size_t thread_cnt)
+ : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
+ , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX)
+ , m_used_memory(0)
+ , m_used_thrds(0)
+ , m_counter(0)
+ {}
+
+ ~SerialScheduler() = default;
+
+ void schedule_job(std::function<void(void*)> job, size_t size, void *args) {
+ size_t ts = m_counter++;
+ auto t = Task(size, ts, job, args);
+ t();
+ }
+
+ void shutdown() {
+ /* intentionally left blank */
+ }
+
+private:
+ size_t m_memory_budget;
+ size_t m_thrd_cnt;
+
+ size_t m_used_thrds;
+ size_t m_used_memory;
+
+ size_t m_counter;
+};
+
+}