From 7ecfb22c32b7986ed1a2439c1abbeed298e4153a Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 20 Oct 2023 17:00:42 -0400 Subject: Initial pass w/ new scheduler setup currently there's a race condition of some type to sort out. --- include/framework/DynamicExtension.h | 184 ++++++++++++++++++++++------------- 1 file changed, 114 insertions(+), 70 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 3a460aa..fc7922c 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -31,7 +31,8 @@ namespace de { -template > +template class DynamicExtension { typedef S Shard; typedef MutableBuffer Buffer; @@ -83,68 +84,8 @@ public: return internal_append(rec, true); } - std::vector query(void *parms) { - auto buffer = get_buffer(); - auto vers = get_active_version(); - - // Get the buffer query state - auto buffer_state = Q::get_buffer_query_state(buffer, parms); - - // Get the shard query states - std::vector> shards; - std::vector states; - - for (auto &level : vers->get_levels()) { - level->get_query_states(shards, states, parms); - } - - Q::process_query_states(parms, states, buffer_state); - - std::vector>> query_results(shards.size() + 1); - - // Execute the query for the buffer - auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); - query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers)); - if constexpr (Q::EARLY_ABORT) { - if (query_results[0].size() > 0) { - auto result = Q::merge(query_results, parms); - for (size_t i=0; i 0) { - auto result = Q::merge(query_results, parms); - for (size_t i=0; i> query(void *parms) { + return schedule_query(get_active_version(), get_buffer(), parms); } size_t get_record_count() { @@ -239,6 +180,112 @@ private: return m_versions[0]; } + static void merge(void *arguments) { + MergeArgs *args = (MergeArgs *) arguments; + + Structure *vers = (Structure *) args->version; + Buffer *buff = (Buffer *) args->buffer; + + for (ssize_t i=args->merges.size() - 1; i>=0; i--) { + vers->merge_levels(args->merges[i].second, args->merges[i].first); + } + + vers->merge_buffer(buff); + + args->result.set_value(true); + delete args; + } + + static void async_query(void *arguments) { + QueryArgs *args = (QueryArgs *) arguments; + + auto buffer = (Buffer *) args->buffer; + auto vers = (Structure *) args->version; + void *parms = args->query_parms; + + // Get the buffer query state + auto buffer_state = Q::get_buffer_query_state(buffer, parms); + + // Get the shard query states + std::vector> shards; + std::vector states; + + for (auto &level : vers->get_levels()) { + level->get_query_states(shards, states, parms); + } + + Q::process_query_states(parms, states, buffer_state); + + std::vector>> query_results(shards.size() + 1); + + // Execute the query for the buffer + auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); + query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers)); + if constexpr (Q::EARLY_ABORT) { + if (query_results[0].size() > 0) { + auto result = Q::merge(query_results, parms); + for (size_t i=0; i 0) { + auto result = Q::merge(query_results, parms); + for (size_t i=0; iresult_set.set_value(std::move(result)); + delete args; + } + + std::future schedule_merge(Structure *version, Buffer *buffer) { + MergeArgs *args = new MergeArgs(); + args->merges = version->get_merge_tasks(buffer->get_record_count()); + args->buffer = buffer; + args->version = version; + + m_sched.schedule_job(merge, 0, args); + + return args->result.get_future(); + } + + std::future> schedule_query(Structure *version, Buffer *buffer, void *query_parms) { + QueryArgs *args = new QueryArgs(); + args->buffer = buffer; + args->version = version; + args->buffer = query_parms; + + m_sched.schedule_job(async_query, 0, args); + + return args->result_set.get_future(); + } + int internal_append(const R &rec, bool ts) { Buffer *buffer; while (!(buffer = get_buffer())) @@ -246,13 +293,15 @@ private: if (buffer->is_full()) { auto vers = get_active_version(); - m_sched.schedule_merge(vers, buffer); + auto res = schedule_merge(vers, buffer); + res.get(); } + return buffer->append(rec, ts); } - std::vector> filter_deletes(std::vector> &records, ShardID shid, Buffer *buffer, Structure *vers) { + static std::vector> filter_deletes(std::vector> &records, ShardID shid, Buffer *buffer, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -303,10 +352,5 @@ private: return processed_records; } }; - -template -static void de_merge_callback(DynamicExtension extension, ExtensionStructure new_version) { - -} } -- cgit v1.2.3