From eb8dbaa770a57557d67c817c2839c64f536a6ce4 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 13 Sep 2023 16:22:03 -0400 Subject: Began re-architecting the project for concurrency support The project is now in a state where it builds, but it probably has a lot of bugs still. --- include/framework/Scheduler.h | 76 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 include/framework/Scheduler.h (limited to 'include/framework/Scheduler.h') diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h new file mode 100644 index 0000000..cd3f430 --- /dev/null +++ b/include/framework/Scheduler.h @@ -0,0 +1,76 @@ +/* + * include/framework/Scheduler.h + * + * Copyright (C) 2023 Douglas Rumbaugh + * Dong Xie + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include +#include +#include + +#include "util/types.h" +#include "framework/ShardInterface.h" +#include "framework/QueryInterface.h" +#include "framework/RecordInterface.h" +#include "framework/MutableBuffer.h" +#include "framework/Configuration.h" +#include "framework/ExtensionStructure.h" + +namespace de { + + +struct MergeTask { + level_index m_source_level; + level_index m_target_level; + size_t m_size; + size_t m_timestamp; + + bool operator<(MergeTask &other) { + return m_timestamp < other.m_timestamp; + } +}; + + +template +class Scheduler { + typedef ExtensionStructure Structure; +public: + /* + * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means + * unlimited. + */ + Scheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX) + , m_used_memory(0) + , m_used_threads(0) + {} + + bool schedule_merge(Structure *version, MutableBuffer *buffer) { + // FIXME: this is a non-concurrent implementation + return version->merge_buffer(buffer); + } + +private: + size_t get_timestamp() { + auto ts = m_timestamp.fetch_add(1); + return ts; + } + + size_t m_memory_budget; + size_t m_thread_cnt; + + alignas(64) std::atomic m_used_memory; + alignas(64) std::atomic m_used_threads; + alignas(64) std::atomic m_timestamp; + + std::priority_queue m_merge_queue; + std::mutex m_merge_queue_lock; +}; + +} -- cgit v1.2.3