diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-10-20 17:00:42 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-10-20 17:00:42 -0400 |
| commit | 7ecfb22c32b7986ed1a2439c1abbeed298e4153a (patch) | |
| tree | d7207b5755ce987068620b71f9b4af9a52982c0d /include/framework/DynamicExtension.h | |
| parent | 1a47cbd7978dcad7ed0b2f2af3f933137eedbfa3 (diff) | |
| download | dynamic-extension-7ecfb22c32b7986ed1a2439c1abbeed298e4153a.tar.gz | |
Initial pass w/ new scheduler setup
currently there's a race condition of some type to sort out.
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 184 |
1 files changed, 114 insertions, 70 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 3a460aa..fc7922c 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -31,7 +31,8 @@ namespace de { -template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler<R, S, Q, L>> +template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, + DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler> class DynamicExtension { typedef S Shard; typedef MutableBuffer<R> Buffer; @@ -83,68 +84,8 @@ public: return internal_append(rec, true); } - std::vector<R> query(void *parms) { - auto buffer = get_buffer(); - auto vers = get_active_version(); - - // Get the buffer query state - auto buffer_state = Q::get_buffer_query_state(buffer, parms); - - // Get the shard query states - std::vector<std::pair<ShardID, Shard*>> shards; - std::vector<void*> states; - - for (auto &level : vers->get_levels()) { - level->get_query_states(shards, states, parms); - } - - Q::process_query_states(parms, states, buffer_state); - - std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); - - // Execute the query for the buffer - auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); - query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers)); - if constexpr (Q::EARLY_ABORT) { - if (query_results[0].size() > 0) { - auto result = Q::merge(query_results, parms); - for (size_t i=0; i<states.size(); i++) { - Q::delete_query_state(states[i]); - } - - Q::delete_buffer_query_state(buffer_state); - return result; - } - } - - // Execute the query for each shard - for (size_t i=0; i<shards.size(); i++) { - auto shard_results = Q::query(shards[i].second, states[i], parms); - query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer, vers)); - if constexpr (Q::EARLY_ABORT) { - if (query_results[i].size() > 0) { - auto result = Q::merge(query_results, parms); - for (size_t i=0; i<states.size(); i++) { - Q::delete_query_state(states[i]); - } - - Q::delete_buffer_query_state(buffer_state); - - return result; - } - } - } - - // Merge the results together - auto result = Q::merge(query_results, parms); - - for (size_t i=0; i<states.size(); i++) { - Q::delete_query_state(states[i]); - } - - Q::delete_buffer_query_state(buffer_state); - - return result; + std::future<std::vector<R>> query(void *parms) { + return schedule_query(get_active_version(), get_buffer(), parms); } size_t get_record_count() { @@ -239,6 +180,112 @@ private: return m_versions[0]; } + static void merge(void *arguments) { + MergeArgs *args = (MergeArgs *) arguments; + + Structure *vers = (Structure *) args->version; + Buffer *buff = (Buffer *) args->buffer; + + for (ssize_t i=args->merges.size() - 1; i>=0; i--) { + vers->merge_levels(args->merges[i].second, args->merges[i].first); + } + + vers->merge_buffer(buff); + + args->result.set_value(true); + delete args; + } + + static void async_query(void *arguments) { + QueryArgs<R> *args = (QueryArgs<R> *) arguments; + + auto buffer = (Buffer *) args->buffer; + auto vers = (Structure *) args->version; + void *parms = args->query_parms; + + // Get the buffer query state + auto buffer_state = Q::get_buffer_query_state(buffer, parms); + + // Get the shard query states + std::vector<std::pair<ShardID, Shard*>> shards; + std::vector<void*> states; + + for (auto &level : vers->get_levels()) { + level->get_query_states(shards, states, parms); + } + + Q::process_query_states(parms, states, buffer_state); + + std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); + + // Execute the query for the buffer + auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); + query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers)); + if constexpr (Q::EARLY_ABORT) { + if (query_results[0].size() > 0) { + auto result = Q::merge(query_results, parms); + for (size_t i=0; i<states.size(); i++) { + Q::delete_query_state(states[i]); + } + + Q::delete_buffer_query_state(buffer_state); + return result; + } + } + + // Execute the query for each shard + for (size_t i=0; i<shards.size(); i++) { + auto shard_results = Q::query(shards[i].second, states[i], parms); + query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer, vers)); + if constexpr (Q::EARLY_ABORT) { + if (query_results[i].size() > 0) { + auto result = Q::merge(query_results, parms); + for (size_t i=0; i<states.size(); i++) { + Q::delete_query_state(states[i]); + } + + Q::delete_buffer_query_state(buffer_state); + + return result; + } + } + } + + // Merge the results together + auto result = Q::merge(query_results, parms); + + for (size_t i=0; i<states.size(); i++) { + Q::delete_query_state(states[i]); + } + + Q::delete_buffer_query_state(buffer_state); + + args->result_set.set_value(std::move(result)); + delete args; + } + + std::future<bool> schedule_merge(Structure *version, Buffer *buffer) { + MergeArgs *args = new MergeArgs(); + args->merges = version->get_merge_tasks(buffer->get_record_count()); + args->buffer = buffer; + args->version = version; + + m_sched.schedule_job(merge, 0, args); + + return args->result.get_future(); + } + + std::future<std::vector<R>> schedule_query(Structure *version, Buffer *buffer, void *query_parms) { + QueryArgs<R> *args = new QueryArgs<R>(); + args->buffer = buffer; + args->version = version; + args->buffer = query_parms; + + m_sched.schedule_job(async_query, 0, args); + + return args->result_set.get_future(); + } + int internal_append(const R &rec, bool ts) { Buffer *buffer; while (!(buffer = get_buffer())) @@ -246,13 +293,15 @@ private: if (buffer->is_full()) { auto vers = get_active_version(); - m_sched.schedule_merge(vers, buffer); + auto res = schedule_merge(vers, buffer); + res.get(); } + return buffer->append(rec, ts); } - std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer, Structure *vers) { + static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -303,10 +352,5 @@ private: return processed_records; } }; - -template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING> -static void de_merge_callback(DynamicExtension<R, S, Q, L, D> extension, ExtensionStructure<R, S, Q> new_version) { - -} } |