summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-10-20 17:00:42 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-10-20 17:00:42 -0400
commit7ecfb22c32b7986ed1a2439c1abbeed298e4153a (patch)
treed7207b5755ce987068620b71f9b4af9a52982c0d /include/framework/DynamicExtension.h
parent1a47cbd7978dcad7ed0b2f2af3f933137eedbfa3 (diff)
downloaddynamic-extension-7ecfb22c32b7986ed1a2439c1abbeed298e4153a.tar.gz
Initial pass w/ new scheduler setup
currently there's a race condition of some type to sort out.
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h184
1 files changed, 114 insertions, 70 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 3a460aa..fc7922c 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -31,7 +31,8 @@
namespace de {
-template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler<R, S, Q, L>>
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING,
+ DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler>
class DynamicExtension {
typedef S Shard;
typedef MutableBuffer<R> Buffer;
@@ -83,68 +84,8 @@ public:
return internal_append(rec, true);
}
- std::vector<R> query(void *parms) {
- auto buffer = get_buffer();
- auto vers = get_active_version();
-
- // Get the buffer query state
- auto buffer_state = Q::get_buffer_query_state(buffer, parms);
-
- // Get the shard query states
- std::vector<std::pair<ShardID, Shard*>> shards;
- std::vector<void*> states;
-
- for (auto &level : vers->get_levels()) {
- level->get_query_states(shards, states, parms);
- }
-
- Q::process_query_states(parms, states, buffer_state);
-
- std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
-
- // Execute the query for the buffer
- auto buffer_results = Q::buffer_query(buffer, buffer_state, parms);
- query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers));
- if constexpr (Q::EARLY_ABORT) {
- if (query_results[0].size() > 0) {
- auto result = Q::merge(query_results, parms);
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(states[i]);
- }
-
- Q::delete_buffer_query_state(buffer_state);
- return result;
- }
- }
-
- // Execute the query for each shard
- for (size_t i=0; i<shards.size(); i++) {
- auto shard_results = Q::query(shards[i].second, states[i], parms);
- query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer, vers));
- if constexpr (Q::EARLY_ABORT) {
- if (query_results[i].size() > 0) {
- auto result = Q::merge(query_results, parms);
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(states[i]);
- }
-
- Q::delete_buffer_query_state(buffer_state);
-
- return result;
- }
- }
- }
-
- // Merge the results together
- auto result = Q::merge(query_results, parms);
-
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(states[i]);
- }
-
- Q::delete_buffer_query_state(buffer_state);
-
- return result;
+ std::future<std::vector<R>> query(void *parms) {
+ return schedule_query(get_active_version(), get_buffer(), parms);
}
size_t get_record_count() {
@@ -239,6 +180,112 @@ private:
return m_versions[0];
}
+ static void merge(void *arguments) {
+ MergeArgs *args = (MergeArgs *) arguments;
+
+ Structure *vers = (Structure *) args->version;
+ Buffer *buff = (Buffer *) args->buffer;
+
+ for (ssize_t i=args->merges.size() - 1; i>=0; i--) {
+ vers->merge_levels(args->merges[i].second, args->merges[i].first);
+ }
+
+ vers->merge_buffer(buff);
+
+ args->result.set_value(true);
+ delete args;
+ }
+
+ static void async_query(void *arguments) {
+ QueryArgs<R> *args = (QueryArgs<R> *) arguments;
+
+ auto buffer = (Buffer *) args->buffer;
+ auto vers = (Structure *) args->version;
+ void *parms = args->query_parms;
+
+ // Get the buffer query state
+ auto buffer_state = Q::get_buffer_query_state(buffer, parms);
+
+ // Get the shard query states
+ std::vector<std::pair<ShardID, Shard*>> shards;
+ std::vector<void*> states;
+
+ for (auto &level : vers->get_levels()) {
+ level->get_query_states(shards, states, parms);
+ }
+
+ Q::process_query_states(parms, states, buffer_state);
+
+ std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
+
+ // Execute the query for the buffer
+ auto buffer_results = Q::buffer_query(buffer, buffer_state, parms);
+ query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers));
+ if constexpr (Q::EARLY_ABORT) {
+ if (query_results[0].size() > 0) {
+ auto result = Q::merge(query_results, parms);
+ for (size_t i=0; i<states.size(); i++) {
+ Q::delete_query_state(states[i]);
+ }
+
+ Q::delete_buffer_query_state(buffer_state);
+ return result;
+ }
+ }
+
+ // Execute the query for each shard
+ for (size_t i=0; i<shards.size(); i++) {
+ auto shard_results = Q::query(shards[i].second, states[i], parms);
+ query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer, vers));
+ if constexpr (Q::EARLY_ABORT) {
+ if (query_results[i].size() > 0) {
+ auto result = Q::merge(query_results, parms);
+ for (size_t i=0; i<states.size(); i++) {
+ Q::delete_query_state(states[i]);
+ }
+
+ Q::delete_buffer_query_state(buffer_state);
+
+ return result;
+ }
+ }
+ }
+
+ // Merge the results together
+ auto result = Q::merge(query_results, parms);
+
+ for (size_t i=0; i<states.size(); i++) {
+ Q::delete_query_state(states[i]);
+ }
+
+ Q::delete_buffer_query_state(buffer_state);
+
+ args->result_set.set_value(std::move(result));
+ delete args;
+ }
+
+ std::future<bool> schedule_merge(Structure *version, Buffer *buffer) {
+ MergeArgs *args = new MergeArgs();
+ args->merges = version->get_merge_tasks(buffer->get_record_count());
+ args->buffer = buffer;
+ args->version = version;
+
+ m_sched.schedule_job(merge, 0, args);
+
+ return args->result.get_future();
+ }
+
+ std::future<std::vector<R>> schedule_query(Structure *version, Buffer *buffer, void *query_parms) {
+ QueryArgs<R> *args = new QueryArgs<R>();
+ args->buffer = buffer;
+ args->version = version;
+ args->buffer = query_parms;
+
+ m_sched.schedule_job(async_query, 0, args);
+
+ return args->result_set.get_future();
+ }
+
int internal_append(const R &rec, bool ts) {
Buffer *buffer;
while (!(buffer = get_buffer()))
@@ -246,13 +293,15 @@ private:
if (buffer->is_full()) {
auto vers = get_active_version();
- m_sched.schedule_merge(vers, buffer);
+ auto res = schedule_merge(vers, buffer);
+ res.get();
}
+
return buffer->append(rec, ts);
}
- std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer, Structure *vers) {
+ static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer, Structure *vers) {
if constexpr (!Q::SKIP_DELETE_FILTER) {
return records;
}
@@ -303,10 +352,5 @@ private:
return processed_records;
}
};
-
-template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING>
-static void de_merge_callback(DynamicExtension<R, S, Q, L, D> extension, ExtensionStructure<R, S, Q> new_version) {
-
-}
}