From 7ecfb22c32b7986ed1a2439c1abbeed298e4153a Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 20 Oct 2023 17:00:42 -0400 Subject: Initial pass w/ new scheduler setup currently there's a race condition of some type to sort out. --- include/framework/DynamicExtension.h | 184 +++++++++++++--------- include/framework/interface/Scheduler.h | 17 +- include/framework/scheduling/SerialScheduler.h | 191 ++++------------------- include/framework/scheduling/Task.h | 112 ++++--------- include/framework/structure/ExtensionStructure.h | 21 ++- include/framework/structure/MutableBuffer.h | 11 ++ include/framework/util/Configuration.h | 1 + 7 files changed, 196 insertions(+), 341 deletions(-) (limited to 'include/framework') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 3a460aa..fc7922c 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -31,7 +31,8 @@ namespace de { -template > +template class DynamicExtension { typedef S Shard; typedef MutableBuffer Buffer; @@ -83,68 +84,8 @@ public: return internal_append(rec, true); } - std::vector query(void *parms) { - auto buffer = get_buffer(); - auto vers = get_active_version(); - - // Get the buffer query state - auto buffer_state = Q::get_buffer_query_state(buffer, parms); - - // Get the shard query states - std::vector> shards; - std::vector states; - - for (auto &level : vers->get_levels()) { - level->get_query_states(shards, states, parms); - } - - Q::process_query_states(parms, states, buffer_state); - - std::vector>> query_results(shards.size() + 1); - - // Execute the query for the buffer - auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); - query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers)); - if constexpr (Q::EARLY_ABORT) { - if (query_results[0].size() > 0) { - auto result = Q::merge(query_results, parms); - for (size_t i=0; i 0) { - auto result = Q::merge(query_results, parms); - for (size_t i=0; i> query(void *parms) { + return schedule_query(get_active_version(), get_buffer(), parms); } size_t get_record_count() { @@ -239,6 +180,112 @@ private: return m_versions[0]; } + static void merge(void *arguments) { + MergeArgs *args = (MergeArgs *) arguments; + + Structure *vers = (Structure *) args->version; + Buffer *buff = (Buffer *) args->buffer; + + for (ssize_t i=args->merges.size() - 1; i>=0; i--) { + vers->merge_levels(args->merges[i].second, args->merges[i].first); + } + + vers->merge_buffer(buff); + + args->result.set_value(true); + delete args; + } + + static void async_query(void *arguments) { + QueryArgs *args = (QueryArgs *) arguments; + + auto buffer = (Buffer *) args->buffer; + auto vers = (Structure *) args->version; + void *parms = args->query_parms; + + // Get the buffer query state + auto buffer_state = Q::get_buffer_query_state(buffer, parms); + + // Get the shard query states + std::vector> shards; + std::vector states; + + for (auto &level : vers->get_levels()) { + level->get_query_states(shards, states, parms); + } + + Q::process_query_states(parms, states, buffer_state); + + std::vector>> query_results(shards.size() + 1); + + // Execute the query for the buffer + auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); + query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers)); + if constexpr (Q::EARLY_ABORT) { + if (query_results[0].size() > 0) { + auto result = Q::merge(query_results, parms); + for (size_t i=0; i 0) { + auto result = Q::merge(query_results, parms); + for (size_t i=0; iresult_set.set_value(std::move(result)); + delete args; + } + + std::future schedule_merge(Structure *version, Buffer *buffer) { + MergeArgs *args = new MergeArgs(); + args->merges = version->get_merge_tasks(buffer->get_record_count()); + args->buffer = buffer; + args->version = version; + + m_sched.schedule_job(merge, 0, args); + + return args->result.get_future(); + } + + std::future> schedule_query(Structure *version, Buffer *buffer, void *query_parms) { + QueryArgs *args = new QueryArgs(); + args->buffer = buffer; + args->version = version; + args->buffer = query_parms; + + m_sched.schedule_job(async_query, 0, args); + + return args->result_set.get_future(); + } + int internal_append(const R &rec, bool ts) { Buffer *buffer; while (!(buffer = get_buffer())) @@ -246,13 +293,15 @@ private: if (buffer->is_full()) { auto vers = get_active_version(); - m_sched.schedule_merge(vers, buffer); + auto res = schedule_merge(vers, buffer); + res.get(); } + return buffer->append(rec, ts); } - std::vector> filter_deletes(std::vector> &records, ShardID shid, Buffer *buffer, Structure *vers) { + static std::vector> filter_deletes(std::vector> &records, ShardID shid, Buffer *buffer, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -303,10 +352,5 @@ private: return processed_records; } }; - -template -static void de_merge_callback(DynamicExtension extension, ExtensionStructure new_version) { - -} } diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h index 1445e90..e8ffd08 100644 --- a/include/framework/interface/Scheduler.h +++ b/include/framework/interface/Scheduler.h @@ -12,20 +12,11 @@ #include #include "framework/interface/Record.h" #include "util/types.h" +#include "framework/scheduling/Task.h" template -concept SchedulerInterface = requires(S s, size_t i, void *vp) { +concept SchedulerInterface = requires(S s, size_t i, void *vp, de::Job j) { {S(i, i)}; -// {s.schedule_merge(vp, vp)}; - -/* - {q.get_query_state(p, p)} -> std::convertible_to; - {q.get_buffer_query_state(p, p)}; - {q.query(p, p)}; - {q.buffer_query(p, p)}; - {q.merge()}; - {q.delete_query_state(p)}; -*/ - //{Q::get_query_state(p, p)} -> std::convertible_to; - //{Q::get_buffer_query_state(p, p)} -> std::convertible_to; + {s.schedule_job(j, i, vp)} -> std::convertible_to; + {s.shutdown()}; }; diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index c43e930..5d6e5c2 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -29,198 +29,67 @@ namespace de { -template class SerialScheduler { - typedef ExtensionStructure Structure; - typedef MutableBuffer Buffer; public: - /* - * A simple "scheduler" that runs tasks serially, in a FIFO manner. Incoming concurrent - * requests will wait for their turn, and only one task will be active in the system at - * a time. The scheduler will spin up a second thread for running itself, but all tasks - * will be single-threaded. - * - * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means - * unlimited. - * - * Note that the SerialScheduler object is non-concurrent, and so will ignore the - * thread_cnt argument. It will obey the memory_budget, however a failure due to - * memory constraints will be irrecoverable, as there is no way to free up memory - * or block particular tasks until memory becomes available. - */ - SerialScheduler(size_t memory_budget, size_t thread_cnt) - : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX) - , m_used_memory(0) - , m_used_threads(0) - , m_shutdown(false) - { - m_sched_thrd = std::thread(&SerialScheduler::run_scheduler, this); + SerialScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) + , m_used_memory(0) + , m_used_thrds(0) + , m_shutdown(false) + { + m_sched_thrd = std::thread(&SerialScheduler::run, this); } ~SerialScheduler() { - m_shutdown = true; + shutdown(); m_cv.notify_all(); m_sched_thrd.join(); } - bool schedule_merge(Structure *version, MutableBuffer *buffer) { - pending_version = version; - pending_buffer = buffer; - - /* - * Get list of individual level reconstructions that are necessary - * for completing the overall merge - */ - std::vector merges = version->get_merge_tasks(buffer->get_record_count()); - - /* - * Schedule the merge tasks - */ - for (ssize_t i=0; iget_record_count() * sizeof(R) * 2, m_timestamp.fetch_add(1)); - m_task_queue.push(t); - - m_cv.notify_all(); - do { - std::unique_lock merge_cv_lock(m_merge_cv_lock); - m_merge_cv.wait(merge_cv_lock); - } while (m_task_queue.size() > 0); - - assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); - - return true; + void schedule_job(std::function job, size_t size, void *args) { + size_t ts = m_counter.fetch_add(1); + m_task_queue.push(Task(size, ts, job, args)); } - bool schedule_query() { - return true; + void shutdown() { + m_shutdown = true; } private: - size_t get_timestamp() { - auto ts = m_timestamp.fetch_add(1); - return ts; - } - - void schedule_merge(MergeTask task) { - if (task.m_source_level == -1 && task.m_target_level == 0) { - run_buffer_merge(pending_buffer, pending_version); - } else { - run_merge(task, pending_version); - } - } - - - void schedule_query(QueryTask task) { - - } - - void schedule_next_task() { - auto task = m_task_queue.pop(); - - auto type = std::visit(GetTaskType{}, task); - - switch (type) { - case TaskType::MERGE: - schedule_merge(std::get(task)); - break; - case TaskType::QUERY: - schedule_query(std::get(task)); - break; - default: assert(false); - } - - if (m_task_queue.size() == 0) { - m_merge_cv.notify_all(); - } - } + psudb::LockedPriorityQueue m_task_queue; + size_t m_memory_budget; + size_t m_thrd_cnt; - void run_merge(MergeTask task, Structure *version) { - version->merge_levels(task.m_target_level, task.m_source_level); - - if (!version->validate_tombstone_proportion(task.m_target_level)) { - auto tasks = version->get_merge_tasks(task.m_target_level); - /* - * Schedule the merge tasks - */ - std::promise trigger_prom; - tasks[tasks.size() - 1].make_dependent_on(trigger_prom); - tasks[tasks.size() - 1].m_timestamp = m_timestamp.fetch_add(1); - m_task_queue.push(tasks[tasks.size() - 1]); - - for (ssize_t i=tasks.size()-2; i>=0; i--) { - tasks[i].make_dependent_on(tasks[i+1]); - tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_task_queue.push(tasks[i]); - } + bool m_shutdown; - /* - * Block the completion of any task until all have been - * scheduled. Probably not strictly necessary, but due to - * interface constraints with the way promises are used, - * a dummy promise needs to be set up for the first job - * anyway. It's easiest to just release it here. - */ - trigger_prom.set_value(); - } - } + std::atomic m_counter; + std::mutex m_cv_lock; + std::condition_variable m_cv; + std::thread m_sched_thrd; - void run_buffer_merge(Buffer *buffer, Structure *version) { - version->merge_buffer(buffer); - if (!version->validate_tombstone_proportion(0)) { - auto tasks = version->get_merge_tasks_from_level(0); + std::atomic m_used_thrds; + std::atomic m_used_memory; - /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) - */ - for (ssize_t i=tasks.size()-1; i>=0; i--) { - tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_task_queue.push(tasks[i]); - } - } + void schedule_next() { + auto t = m_task_queue.pop(); + t(); } - void run_scheduler() { + void run() { do { std::unique_lock cv_lock(m_cv_lock); m_cv.wait(cv_lock); - while (m_task_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { - schedule_next_task(); + while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) { + schedule_next(); } cv_lock.unlock(); } while(!m_shutdown); } - - size_t m_memory_budget; - size_t m_thread_cnt; - - Buffer *pending_buffer; - Structure *pending_version; - - alignas(64) std::atomic m_used_memory; - alignas(64) std::atomic m_used_threads; - alignas(64) std::atomic m_timestamp; - - psudb::LockedPriorityQueue, std::greater> m_task_queue; - - std::mutex m_cv_lock; - std::condition_variable m_cv; - - std::mutex m_merge_cv_lock; - std::condition_variable m_merge_cv; - - std::thread m_sched_thrd; - - bool m_shutdown; }; } diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 3c1b158..518159d 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -5,111 +5,53 @@ #include #include +#include #include "framework/util/Configuration.h" namespace de { -enum class TaskType { - MERGE, - QUERY +struct MergeArgs { + void *version; + void *buffer; + std::vector merges; + std::promise result; }; -struct TaskDependency { - std::promise prom; - std::future fut; +template +struct QueryArgs { + void *version; + void *buffer; + std::promise> result_set; + void *query_parms; }; -struct MergeTask { - level_index m_source_level; - level_index m_target_level; - size_t m_timestamp; - size_t m_size; - TaskType m_type; - std::unique_ptr m_dep; - - MergeTask() = default; - - MergeTask(level_index source, level_index target, size_t size, size_t timestamp) - : m_source_level(source) - , m_target_level(target) - , m_timestamp(timestamp) - , m_size(size) - , m_type(TaskType::MERGE) - , m_dep(std::make_unique()){} - +typedef std::function Job; - MergeTask(MergeTask &t) - : m_source_level(t.m_source_level) - , m_target_level(t.m_target_level) - , m_timestamp(t.m_timestamp) - , m_size(t.m_size) - , m_type(TaskType::MERGE) - , m_dep(std::move(t.m_dep)) +struct Task { + Task(size_t size, size_t ts, Job job, void *args) + : m_job(job) + , m_size(size) + , m_timestamp(ts) + , m_args(args) {} - - TaskType get_type() const { - return m_type; - } - - void make_dependent_on(MergeTask &task) { - m_dep->fut = task.m_dep->prom.get_future(); - } - - void make_dependent_on(TaskDependency *dep) { - m_dep->fut = dep->prom.get_future(); - } - - friend bool operator<(const MergeTask &self, const MergeTask &other) { - return self.m_timestamp < other.m_timestamp; - } - - friend bool operator>(const MergeTask &self, const MergeTask &other) { - return self.m_timestamp > other.m_timestamp; - } - -}; - -struct QueryTask { - size_t m_timestamp; + Job m_job; size_t m_size; - TaskType m_type; - std::unique_ptr m_dep; - - QueryTask(QueryTask &t) - : m_timestamp(t.m_timestamp) - , m_size(t.m_size) - , m_type(t.m_type) - , m_dep(std::move(t.m_dep)) - {} - - TaskType get_type() const { - return m_type; - } - - void SetDependency(QueryTask &task) { - m_dep->fut = task.m_dep->prom.get_future(); - } - - void SetDependency(TaskDependency *dep) { - m_dep->fut = dep->prom.get_future(); - } + size_t m_timestamp; + void *m_args; - friend bool operator<(const QueryTask &self, const QueryTask &other) { + friend bool operator<(const Task &self, const Task &other) { return self.m_timestamp < other.m_timestamp; } - friend bool operator>(const QueryTask &self, const QueryTask &other) { + friend bool operator>(const Task &self, const Task &other) { return self.m_timestamp > other.m_timestamp; } -}; -struct GetTaskType { - TaskType operator()(const MergeTask &t) { return t.get_type(); } - TaskType operator()(const QueryTask &t) { return t.get_type(); } + void operator()() { + m_job(m_args); + } }; -typedef std::variant Task; - } diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 920e1c3..8344518 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -93,7 +93,10 @@ public: inline bool merge_buffer(Buffer *buffer) { assert(can_merge_with(0, buffer->get_record_count())); + buffer->start_merge(); merge_buffer_into_l0(buffer); + buffer->finish_merge(); + buffer->truncate(); return true; @@ -216,10 +219,7 @@ public: } for (level_index i=merge_base_level; i>0; i--) { - MergeTask task; - task.m_source_level = i - 1; - task.m_target_level = i; - task.m_type = TaskType::MERGE; + MergeTask task = {i-1, i}; /* * The amount of storage required for the merge accounts @@ -237,7 +237,7 @@ public: reccnt += m_levels[i]->get_record_count(); } } - task.m_size = 2* reccnt * sizeof(R); + //task.m_size = 2* reccnt * sizeof(R); merges.push_back(task); } @@ -249,7 +249,7 @@ public: /* * */ - std::vector get_merge_tasks_from_level(size_t source_level) { + std::vector get_merge_tasks_from_level(level_index source_level) { std::vector merges; level_index merge_base_level = find_mergable_level(source_level); @@ -258,10 +258,7 @@ public: } for (level_index i=merge_base_level; i>source_level; i--) { - MergeTask task; - task.m_source_level = i - 1; - task.m_target_level = i; - + MergeTask task = {i - 1, i}; /* * The amount of storage required for the merge accounts * for the cost of storing the new records, along with the @@ -278,12 +275,12 @@ public: reccnt += m_levels[i]->get_record_count(); } } - task.m_size = 2* reccnt * sizeof(R); +// task.m_size = 2* reccnt * sizeof(R); merges.push_back(task); } - return std::move(merges); + return merges; } /* diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 9f12175..804ca5e 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -90,12 +90,23 @@ public: } bool truncate() { + + while (active_merge() || m_refcnt.load() > 0) + ; + + m_merge_lock.lock(); + + while (m_refcnt > 0) + ; + m_tombstonecnt.store(0); m_reccnt.store(0); m_weight.store(0); m_max_weight.store(0); if (m_tombstone_filter) m_tombstone_filter->clear(); + m_merge_lock.unlock(); + return true; } diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index eb9b93f..9d8248f 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -50,5 +50,6 @@ enum class DeletePolicy { }; typedef ssize_t level_index; +typedef std::pair MergeTask; } -- cgit v1.2.3