From d2279e1b96d352a0af1d425dcaaf93e8a26a8d52 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 30 Oct 2023 17:15:05 -0400 Subject: General Comment + Consistency updates --- include/framework/DynamicExtension.h | 46 +++--- include/framework/ShardRequirements.h | 8 + include/framework/interface/Query.h | 34 ++-- include/framework/interface/Record.h | 5 +- include/framework/interface/Scheduler.h | 4 +- include/framework/interface/Shard.h | 20 ++- include/framework/scheduling/Epoch.h | 1 - include/framework/scheduling/FIFOScheduler.h | 3 +- include/framework/scheduling/Scheduler.h | 195 ----------------------- include/framework/scheduling/Task.h | 6 +- include/framework/structure/BufferView.h | 2 +- include/framework/structure/ExtensionStructure.h | 4 +- include/framework/structure/InternalLevel.h | 4 +- include/framework/structure/MutableBuffer.h | 4 +- include/framework/util/Configuration.h | 5 +- 15 files changed, 89 insertions(+), 252 deletions(-) delete mode 100644 include/framework/scheduling/Scheduler.h (limited to 'include/framework') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index f2bbacc..9129060 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -1,7 +1,7 @@ /* * include/framework/DynamicExtension.h * - * Copyright (C) 2023 Douglas Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * Dong Xie * * All rights reserved. Published under the Modified BSD License. @@ -107,8 +107,7 @@ public: } size_t get_record_count() { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); auto t = epoch->get_buffer_view().get_record_count() + epoch->get_structure()->get_record_count(); epoch->end_job(); @@ -116,8 +115,7 @@ public: } size_t get_tombstone_count() { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); auto t = epoch->get_buffer_view().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); epoch->end_job(); @@ -129,8 +127,7 @@ public: } size_t get_memory_usage() { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); auto t= epoch->get_buffer_view().get_memory_usage() + epoch->get_structure()->get_memory_usage(); epoch->end_job(); @@ -138,8 +135,7 @@ public: } size_t get_aux_memory_usage() { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); auto t = epoch->get_buffer_view().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); epoch->end_job(); @@ -151,9 +147,8 @@ public: } Shard *create_static_structure() { - auto epoch = get_active_epoch(); + auto epoch = get_active_epoch_protected(); auto bv = epoch->get_buffer_view(); - epoch->start_job(); auto vers = epoch->get_structure(); std::vector shards; @@ -219,6 +214,11 @@ private: return m_epochs[m_current_epoch.load()]; } + _Epoch *get_active_epoch_protected() { + m_epochs[m_current_epoch.load()]->start_job(); + return m_epochs[m_current_epoch.load()]; + } + void advance_epoch() { size_t new_epoch_num = m_current_epoch.load() + 1; _Epoch *new_epoch = m_epochs[new_epoch_num]; @@ -241,6 +241,12 @@ private: * structure will be a shallow copy of the old one's. */ _Epoch *create_new_epoch() { + /* + * This epoch access is _not_ protected under the assumption that + * only one merge will be able to trigger at a time. If that condition + * is violated, it is possible that this code will clone a retired + * epoch. + */ auto new_epoch = get_active_epoch()->clone(); std::unique_lock m_struct_lock; m_versions.insert(new_epoch->get_structure()); @@ -311,6 +317,8 @@ private: MergeArgs *args = (MergeArgs *) arguments; Structure *vers = args->epoch->get_structure(); + // FIXME: with an improved shard interface, multiple full buffers + // could be merged at once here. Buffer *buff = (Buffer *) args->epoch->get_buffers()[0]; for (ssize_t i=args->merges.size() - 1; i>=0; i--) { @@ -387,24 +395,23 @@ private: } std::future> schedule_query(void *query_parms) { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); QueryArgs *args = new QueryArgs(); args->epoch = epoch; args->query_parms = query_parms; + auto result = args->result_set.get_future(); + m_sched.schedule_job(async_query, 0, args); - return args->result_set.get_future(); + return result; } int internal_append(const R &rec, bool ts) { Buffer *buffer = nullptr; do { - auto epoch = get_active_epoch(); - - while (!(buffer = epoch->get_active_buffer())) - ; + auto epoch = get_active_epoch_protected(); + buffer = epoch->get_active_buffer(); /* if the buffer is full, schedule a merge and add a new empty buffer */ if (buffer->is_full()) { @@ -413,7 +420,8 @@ private: schedule_merge(); buffer = add_empty_buffer(epoch); } - + // FIXME: not exactly the best spot for this + epoch->end_job(); } while(!buffer->append(rec, ts)); /* internal append should always succeed, eventually */ diff --git a/include/framework/ShardRequirements.h b/include/framework/ShardRequirements.h index d2d4ff2..55e7199 100644 --- a/include/framework/ShardRequirements.h +++ b/include/framework/ShardRequirements.h @@ -1,4 +1,12 @@ /* + * include/framework/ShardRequirements.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * + * All rights reserved. Published under the Modified BSD License. + * + * A header file containing the necessary includes for Shard + * development. * */ #pragma once diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h index 9b1d2d6..21cadcb 100644 --- a/include/framework/interface/Query.h +++ b/include/framework/interface/Query.h @@ -1,7 +1,7 @@ /* - * include/framework/QueryInterface.h + * include/framework/interface/Query.h * - * Copyright (C) 2023 Douglas Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * * All rights reserved. Published under the Modified BSD License. * @@ -10,25 +10,29 @@ #include #include + #include "util/types.h" +// FIXME: The interface is not completely specified yet, as it is pending +// determining a good way to handle additional template arguments +// to get the Shard and Record types into play template concept QueryInterface = requires(Q q, void *p, std::vector &s) { -/* - {q.get_query_state(p, p)} -> std::convertible_to; - {q.get_buffer_query_state(p, p)}; - {q.query(p, p)}; - {q.buffer_query(p, p)}; - {q.merge()}; - {q.delete_query_state(p)}; -*/ - {Q::EARLY_ABORT} -> std::convertible_to; - {Q::SKIP_DELETE_FILTER} -> std::convertible_to; - //{Q::get_query_state(p, p)} -> std::convertible_to; - //{Q::get_buffer_query_state(p, p)} -> std::convertible_to; + + /* + {Q::get_query_state(p, p)} -> std::convertible_to; + {Q::get_buffer_query_state(p, p)} -> std::convertible_to; + */ {Q::process_query_states(p, s, s)}; + /* + {Q::query(s, p, p)} -> std::convertible_to>>; + {Q::buffer_query(p, p)} -> std::convertible_to>>; + {Q::merge(rv, p)} -> std::convertible_to>; + */ {Q::delete_query_state(std::declval())} -> std::same_as; - {Q::delete_buffer_query_state(p)}; + {Q::delete_buffer_query_state(std::declval())} -> std::same_as; + {Q::EARLY_ABORT} -> std::convertible_to; + {Q::SKIP_DELETE_FILTER} -> std::convertible_to; }; diff --git a/include/framework/interface/Record.h b/include/framework/interface/Record.h index 1ef1984..bf495df 100644 --- a/include/framework/interface/Record.h +++ b/include/framework/interface/Record.h @@ -1,11 +1,12 @@ /* - * include/framework/RecordInterface.h + * include/framework/interface/Record.h * * Copyright (C) 2023 Douglas Rumbaugh - * Dong Xie * * All rights reserved. Published under the Modified BSD License. * + * FIXME: the record implementations could probably be broken out into + * different files, leaving only the interface here */ #pragma once diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h index e8ffd08..63581d2 100644 --- a/include/framework/interface/Scheduler.h +++ b/include/framework/interface/Scheduler.h @@ -1,7 +1,7 @@ /* - * include/framework/QueryInterface.h + * include/framework/interface/Scheduler.h * - * Copyright (C) 2023 Douglas Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * * All rights reserved. Published under the Modified BSD License. * diff --git a/include/framework/interface/Shard.h b/include/framework/interface/Shard.h index ea58b2a..d3a6cf8 100644 --- a/include/framework/interface/Shard.h +++ b/include/framework/interface/Shard.h @@ -1,7 +1,7 @@ /* - * include/framework/ShardInterface.h + * include/framework/interface/Shard.h * - * Copyright (C) 2023 Douglas Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * * All rights reserved. Published under the Modified BSD License. * @@ -15,12 +15,22 @@ namespace de { -//template typename S, typename R> +// FIXME: The interface is not completely specified yet, as it is pending +// determining a good way to handle additional template arguments +// to get the Record type into play template -concept ShardInterface = requires(S s, void *p, bool b) { - //{s.point_lookup(r, b) } -> std::same_as; +concept ShardInterface = requires(S s, S **spp, void *p, bool b, size_t i) { + {S(spp, i)}; + /* + {S(mutable buffer)} + {s.point_lookup(r, b) } -> std::convertible_to + */ + {s.get_data()} -> std::convertible_to; + {s.get_record_count()} -> std::convertible_to; + {s.get_tombstone_count()} -> std::convertible_to; {s.get_memory_usage()} -> std::convertible_to; + {s.get_aux_memory_usage()} -> std::convertible_to; }; } diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 87463bd..03cbb62 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -2,7 +2,6 @@ * include/framework/scheduling/Epoch.h * * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie * * All rights reserved. Published under the Modified BSD License. * diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 7ccab26..5425c4f 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -1,8 +1,7 @@ /* - * include/framework/Scheduler.h + * include/framework/scheduling/FIFOScheduler.h * * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie * * All rights reserved. Published under the Modified BSD License. * diff --git a/include/framework/scheduling/Scheduler.h b/include/framework/scheduling/Scheduler.h deleted file mode 100644 index 992cbf9..0000000 --- a/include/framework/scheduling/Scheduler.h +++ /dev/null @@ -1,195 +0,0 @@ -/* - * include/framework/Scheduler.h - * - * Copyright (C) 2023 Douglas Rumbaugh - * Dong Xie - * - * All rights reserved. Published under the Modified BSD License. - * - */ -#pragma once - -#include -#include -#include -#include -#include - -#include "util/types.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" -#include "framework/interface/Record.h" -#include "framework/structure/MutableBuffer.h" -#include "framework/util/Configuration.h" -#include "framework/structure/ExtensionStructure.h" - -namespace de { - -template -class Scheduler { - typedef ExtensionStructure Structure; - typedef MutableBuffer Buffer; -public: - /* - * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means - * unlimited. - */ - Scheduler(size_t memory_budget, size_t thread_cnt) - : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX) - , m_used_memory(0) - , m_used_threads(0) - , m_shutdown(false) - { - m_sched_thrd = std::thread(&Scheduler::run_scheduler, this); - } - - ~Scheduler() { - m_shutdown = true; - - m_cv.notify_all(); - m_sched_thrd.join(); - } - - bool schedule_merge(Structure *version, MutableBuffer *buffer) { - /* - * temporary hack - */ - pending_version = version; - pending_buffer = buffer; - - /* - * Get list of individual level reconstructions that are necessary - * for completing the overall merge - */ - std::vector merges = version->get_merge_tasks(buffer->get_record_count()); - - /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) - */ - for (ssize_t i=0; iget_record_count() * sizeof(R) * 2; - buffer_merge.m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(buffer_merge); - m_merge_queue_lock.unlock(); - - m_cv.notify_all(); - do { - std::unique_lock merge_cv_lock(m_merge_cv_lock); - m_merge_cv.wait(merge_cv_lock); - } while (m_merge_queue.size() > 0); - - assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); - - return true; - } - -private: - size_t get_timestamp() { - auto ts = m_timestamp.fetch_add(1); - return ts; - } - - void schedule_next_task() { - m_merge_queue_lock.lock(); - auto task = m_merge_queue.top(); - m_merge_queue.pop(); - m_merge_queue_lock.unlock(); - - if (task.m_source_level == -1 && task.m_target_level == 0) { - run_buffer_merge(pending_buffer, pending_version); - } else { - run_merge(task, pending_version); - } - - if (m_merge_queue.size() == 0) { - m_merge_cv.notify_all(); - } - } - - - void run_merge(MergeTask task, Structure *version) { - version->merge_levels(task.m_target_level, task.m_source_level); - - if (!version->validate_tombstone_proportion(task.m_target_level)) { - auto tasks = version->get_merge_tasks(task.m_target_level); - /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) - */ - for (ssize_t i=tasks.size()-1; i>=0; i--) { - tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(tasks[i]); - m_merge_queue_lock.unlock(); - } - } - } - - - void run_buffer_merge(Buffer *buffer, Structure *version) { - version->merge_buffer(buffer); - if (!version->validate_tombstone_proportion(0)) { - auto tasks = version->get_merge_tasks_from_level(0); - - /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) - */ - for (ssize_t i=tasks.size()-1; i>=0; i--) { - tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_merge_queue_lock.lock(); - m_merge_queue.push(tasks[i]); - m_merge_queue_lock.unlock(); - } - } - } - - void run_scheduler() { - do { - std::unique_lock cv_lock(m_cv_lock); - m_cv.wait(cv_lock); - - while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { - schedule_next_task(); - } - cv_lock.unlock(); - } while(!m_shutdown); - } - - size_t m_memory_budget; - size_t m_thread_cnt; - - Buffer *pending_buffer; - Structure *pending_version; - - alignas(64) std::atomic m_used_memory; - alignas(64) std::atomic m_used_threads; - alignas(64) std::atomic m_timestamp; - - std::priority_queue, std::greater> m_merge_queue; - std::mutex m_merge_queue_lock; - - std::mutex m_cv_lock; - std::condition_variable m_cv; - - std::mutex m_merge_cv_lock; - std::condition_variable m_merge_cv; - - std::thread m_sched_thrd; - - bool m_shutdown; -}; - -} diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index d25c7c0..228665f 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -1,9 +1,13 @@ /* + * include/framework/scheduling/Task.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * + * All rights reserved. Published under the Modified BSD License. * */ #pragma once -#include #include #include diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 8dff2ef..ccd3dac 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -1,7 +1,7 @@ /* * include/framework/structure/BufferView.h * - * Copyright (C) 2023 Douglas Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * * All rights reserved. Published under the Modified BSD License. * diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index de965ae..1f365ae 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -1,7 +1,7 @@ /* - * include/framework/ExtensionStructure.h + * include/framework/structure/ExtensionStructure.h * - * Copyright (C) 2023 Douglas Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * Dong Xie * * All rights reserved. Published under the Modified BSD License. diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index 342a2c7..7a7b98c 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -1,7 +1,7 @@ /* - * include/framework/InternalLevel.h + * include/framework/structure/InternalLevel.h * - * Copyright (C) 2023 Douglas Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * Dong Xie * * All rights reserved. Published under the Modified BSD License. diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 974dc28..e0a6962 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -1,7 +1,7 @@ /* - * include/framework/MutableBuffer.h + * include/framework/structure/MutableBuffer.h * - * Copyright (C) 2023 Douglas Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * Dong Xie * * All rights reserved. Published under the Modified BSD License. diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 9d8248f..ec4ec3a 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -1,8 +1,7 @@ /* - * include/framework/DynamicExtension.h + * include/framework/util/Configuration.h * - * Copyright (C) 2023 Douglas Rumbaugh - * Dong Xie + * Copyright (C) 2023 Douglas B. Rumbaugh * * All rights reserved. Published under the Modified BSD License. * -- cgit v1.2.3