summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-10-30 17:15:05 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-10-30 17:15:05 -0400
commitd2279e1b96d352a0af1d425dcaaf93e8a26a8d52 (patch)
tree4e8df98339ff5578deb8f8be46b9f6c3cc34cef4 /include/framework/scheduling
parent8ce1cb0eef7d5631f0f7788804845ddc8296ac6f (diff)
downloaddynamic-extension-d2279e1b96d352a0af1d425dcaaf93e8a26a8d52.tar.gz
General Comment + Consistency updates
Diffstat (limited to 'include/framework/scheduling')
-rw-r--r--include/framework/scheduling/Epoch.h1
-rw-r--r--include/framework/scheduling/FIFOScheduler.h3
-rw-r--r--include/framework/scheduling/Scheduler.h195
-rw-r--r--include/framework/scheduling/Task.h6
4 files changed, 6 insertions, 199 deletions
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index 87463bd..03cbb62 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -2,7 +2,6 @@
* include/framework/scheduling/Epoch.h
*
* Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
*
* All rights reserved. Published under the Modified BSD License.
*
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 7ccab26..5425c4f 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -1,8 +1,7 @@
/*
- * include/framework/Scheduler.h
+ * include/framework/scheduling/FIFOScheduler.h
*
* Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
*
* All rights reserved. Published under the Modified BSD License.
*
diff --git a/include/framework/scheduling/Scheduler.h b/include/framework/scheduling/Scheduler.h
deleted file mode 100644
index 992cbf9..0000000
--- a/include/framework/scheduling/Scheduler.h
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * include/framework/Scheduler.h
- *
- * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
- *
- * All rights reserved. Published under the Modified BSD License.
- *
- */
-#pragma once
-
-#include <vector>
-#include <memory>
-#include <queue>
-#include <thread>
-#include <condition_variable>
-
-#include "util/types.h"
-#include "framework/interface/Shard.h"
-#include "framework/interface/Query.h"
-#include "framework/interface/Record.h"
-#include "framework/structure/MutableBuffer.h"
-#include "framework/util/Configuration.h"
-#include "framework/structure/ExtensionStructure.h"
-
-namespace de {
-
-template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
-class Scheduler {
- typedef ExtensionStructure<R, S, Q, L> Structure;
- typedef MutableBuffer<R> Buffer;
-public:
- /*
- * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means
- * unlimited.
- */
- Scheduler(size_t memory_budget, size_t thread_cnt)
- : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
- , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX)
- , m_used_memory(0)
- , m_used_threads(0)
- , m_shutdown(false)
- {
- m_sched_thrd = std::thread(&Scheduler::run_scheduler, this);
- }
-
- ~Scheduler() {
- m_shutdown = true;
-
- m_cv.notify_all();
- m_sched_thrd.join();
- }
-
- bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) {
- /*
- * temporary hack
- */
- pending_version = version;
- pending_buffer = buffer;
-
- /*
- * Get list of individual level reconstructions that are necessary
- * for completing the overall merge
- */
- std::vector<MergeTask> merges = version->get_merge_tasks(buffer->get_record_count());
-
- /*
- * Schedule the merge tasks (FIXME: currently this just
- * executes them sequentially in a blocking fashion)
- */
- for (ssize_t i=0; i<merges.size(); i++) {
- merges[i].m_timestamp = m_timestamp.fetch_add(1);
- m_merge_queue_lock.lock();
- m_merge_queue.push(merges[i]);
- m_merge_queue_lock.unlock();
- }
-
- MergeTask buffer_merge;
- buffer_merge.m_source_level = -1;
- buffer_merge.m_target_level = 0;
- buffer_merge.m_size = buffer->get_record_count() * sizeof(R) * 2;
- buffer_merge.m_timestamp = m_timestamp.fetch_add(1);
- m_merge_queue_lock.lock();
- m_merge_queue.push(buffer_merge);
- m_merge_queue_lock.unlock();
-
- m_cv.notify_all();
- do {
- std::unique_lock<std::mutex> merge_cv_lock(m_merge_cv_lock);
- m_merge_cv.wait(merge_cv_lock);
- } while (m_merge_queue.size() > 0);
-
- assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0);
-
- return true;
- }
-
-private:
- size_t get_timestamp() {
- auto ts = m_timestamp.fetch_add(1);
- return ts;
- }
-
- void schedule_next_task() {
- m_merge_queue_lock.lock();
- auto task = m_merge_queue.top();
- m_merge_queue.pop();
- m_merge_queue_lock.unlock();
-
- if (task.m_source_level == -1 && task.m_target_level == 0) {
- run_buffer_merge(pending_buffer, pending_version);
- } else {
- run_merge(task, pending_version);
- }
-
- if (m_merge_queue.size() == 0) {
- m_merge_cv.notify_all();
- }
- }
-
-
- void run_merge(MergeTask task, Structure *version) {
- version->merge_levels(task.m_target_level, task.m_source_level);
-
- if (!version->validate_tombstone_proportion(task.m_target_level)) {
- auto tasks = version->get_merge_tasks(task.m_target_level);
- /*
- * Schedule the merge tasks (FIXME: currently this just
- * executes them sequentially in a blocking fashion)
- */
- for (ssize_t i=tasks.size()-1; i>=0; i--) {
- tasks[i].m_timestamp = m_timestamp.fetch_add(1);
- m_merge_queue_lock.lock();
- m_merge_queue.push(tasks[i]);
- m_merge_queue_lock.unlock();
- }
- }
- }
-
-
- void run_buffer_merge(Buffer *buffer, Structure *version) {
- version->merge_buffer(buffer);
- if (!version->validate_tombstone_proportion(0)) {
- auto tasks = version->get_merge_tasks_from_level(0);
-
- /*
- * Schedule the merge tasks (FIXME: currently this just
- * executes them sequentially in a blocking fashion)
- */
- for (ssize_t i=tasks.size()-1; i>=0; i--) {
- tasks[i].m_timestamp = m_timestamp.fetch_add(1);
- m_merge_queue_lock.lock();
- m_merge_queue.push(tasks[i]);
- m_merge_queue_lock.unlock();
- }
- }
- }
-
- void run_scheduler() {
- do {
- std::unique_lock<std::mutex> cv_lock(m_cv_lock);
- m_cv.wait(cv_lock);
-
- while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) {
- schedule_next_task();
- }
- cv_lock.unlock();
- } while(!m_shutdown);
- }
-
- size_t m_memory_budget;
- size_t m_thread_cnt;
-
- Buffer *pending_buffer;
- Structure *pending_version;
-
- alignas(64) std::atomic<size_t> m_used_memory;
- alignas(64) std::atomic<size_t> m_used_threads;
- alignas(64) std::atomic<size_t> m_timestamp;
-
- std::priority_queue<MergeTask, std::vector<MergeTask>, std::greater<MergeTask>> m_merge_queue;
- std::mutex m_merge_queue_lock;
-
- std::mutex m_cv_lock;
- std::condition_variable m_cv;
-
- std::mutex m_merge_cv_lock;
- std::condition_variable m_merge_cv;
-
- std::thread m_sched_thrd;
-
- bool m_shutdown;
-};
-
-}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index d25c7c0..228665f 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -1,9 +1,13 @@
/*
+ * include/framework/scheduling/Task.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
*
*/
#pragma once
-#include <variant>
#include <future>
#include <functional>