summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-07-27 18:21:26 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-07-27 18:21:26 -0400
commitd6e08e9d8d3ac9b356ac50cee22b41f828160247 (patch)
tree2c1f62f6385072e1a824e557693634acc3136cdd /include/framework/DynamicExtension.h
parentf462921cbc52688fb478c6ac86a891e596fd6053 (diff)
downloaddynamic-extension-d6e08e9d8d3ac9b356ac50cee22b41f828160247.tar.gz
Expanded query interface
Query interface now enables skipping of delete processing and stopping query processing when first match is found.
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h41
1 files changed, 34 insertions, 7 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index bd09e1f..5c903b9 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -121,16 +121,39 @@ public:
Q::process_query_states(parms, states, buffer_state);
- std::vector<std::vector<R>> query_results(shards.size() + 1);
+ std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
// Execute the query for the buffer
auto buffer_results = Q::buffer_query(buffer, buffer_state, parms);
- query_results[0] = filter_deletes(buffer_results, {-1, -1}, buffer);
+ query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer));
+ if constexpr (Q::EARLY_ABORT) {
+ if (query_results[0].size() > 0) {
+ auto result = Q::merge(query_results, parms);
+ for (size_t i=0; i<states.size(); i++) {
+ Q::delete_query_state(states[i]);
+ }
+
+ Q::delete_buffer_query_state(buffer_state);
+ return result;
+ }
+ }
// Execute the query for each shard
for (size_t i=0; i<shards.size(); i++) {
auto shard_results = Q::query(shards[i].second, states[i], parms);
- query_results[i+1] = filter_deletes(shard_results, shards[i].first, buffer);
+ query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer));
+ if constexpr (Q::EARLY_ABORT) {
+ if (query_results[i].size() > 0) {
+ auto result = Q::merge(query_results, parms);
+ for (size_t i=0; i<states.size(); i++) {
+ Q::delete_query_state(states[i]);
+ }
+
+ Q::delete_buffer_query_state(buffer_state);
+
+ return result;
+ }
+ }
}
// Merge the results together
@@ -264,8 +287,12 @@ private:
return buffer->append(rec, ts);
}
- std::vector<R> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer) {
- std::vector<R> processed_records;
+ std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer) {
+ if constexpr (!Q::SKIP_DELETE_FILTER) {
+ return records;
+ }
+
+ std::vector<Wrapped<R>> processed_records;
processed_records.reserve(records.size());
// For delete tagging, we just need to check the delete bit on each
@@ -276,7 +303,7 @@ private:
continue;
}
- processed_records.emplace_back(rec.rec);
+ processed_records.emplace_back(rec);
}
return processed_records;
@@ -305,7 +332,7 @@ private:
}
}
- processed_records.emplace_back(rec.rec);
+ processed_records.emplace_back(rec);
}
return processed_records;