From 7163b8db0ee5acc099a228090a4bdee379c1c8af Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 11:53:08 -0400 Subject: SerialScheduler: added a single-threaded scheduler Added a new scheduler for ensuring single-threaded operation. Additionally, added a static assert to (at least for now) restrict the use of tagging to this single threaded scheduler. --- include/framework/DynamicExtension.h | 2 + include/framework/scheduling/SerialScheduler.h | 67 ++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 include/framework/scheduling/SerialScheduler.h (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index a1f7c2b..3a4a7e1 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -26,6 +26,7 @@ #include "framework/util/Configuration.h" #include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/SerialScheduler.h" #include "framework/scheduling/Epoch.h" #include "psu-util/timer.h" @@ -82,6 +83,7 @@ public: // FIXME: delete tagging will require a lot of extra work to get // operating "correctly" in a concurrent environment. if constexpr (D == DeletePolicy::TAGGING) { + static_assert(std::same_as, "Tagging is only supported in single-threaded operation"); BufView buffers = get_active_epoch()->get_buffer_view(); if (get_active_epoch()->get_structure()->tagged_delete(rec)) { diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h new file mode 100644 index 0000000..9c767e8 --- /dev/null +++ b/include/framework/scheduling/SerialScheduler.h @@ -0,0 +1,67 @@ +/* + * include/framework/scheduling/SerialScheduler.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * + * All rights reserved. Published under the Modified BSD License. + * + * IMPORTANT: This "scheduler" is a shim implementation for allowing + * strictly serial, single-threaded operation of the framework. It should + * never be used in multi-threaded contexts. A call to the schedule_job + * function will immediately run the job and block on its completion before + * returning. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "util/types.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" +#include "framework/interface/Record.h" +#include "framework/structure/MutableBuffer.h" +#include "framework/util/Configuration.h" +#include "framework/structure/ExtensionStructure.h" +#include "framework/scheduling/Task.h" + +namespace de { + +class SerialScheduler { +public: + SerialScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) + , m_used_memory(0) + , m_used_thrds(0) + , m_counter(0) + {} + + ~SerialScheduler() = default; + + void schedule_job(std::function job, size_t size, void *args) { + size_t ts = m_counter++; + auto t = Task(size, ts, job, args); + t(); + } + + void shutdown() { + /* intentionally left blank */ + } + +private: + size_t m_memory_budget; + size_t m_thrd_cnt; + + size_t m_used_thrds; + size_t m_used_memory; + + size_t m_counter; +}; + +} -- cgit v1.2.3