summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h41
-rw-r--r--include/framework/QueryInterface.h10
2 files changed, 43 insertions, 8 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index bd09e1f..5c903b9 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -121,16 +121,39 @@ public:
Q::process_query_states(parms, states, buffer_state);
- std::vector<std::vector<R>> query_results(shards.size() + 1);
+ std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
// Execute the query for the buffer
auto buffer_results = Q::buffer_query(buffer, buffer_state, parms);
- query_results[0] = filter_deletes(buffer_results, {-1, -1}, buffer);
+ query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer));
+ if constexpr (Q::EARLY_ABORT) {
+ if (query_results[0].size() > 0) {
+ auto result = Q::merge(query_results, parms);
+ for (size_t i=0; i<states.size(); i++) {
+ Q::delete_query_state(states[i]);
+ }
+
+ Q::delete_buffer_query_state(buffer_state);
+ return result;
+ }
+ }
// Execute the query for each shard
for (size_t i=0; i<shards.size(); i++) {
auto shard_results = Q::query(shards[i].second, states[i], parms);
- query_results[i+1] = filter_deletes(shard_results, shards[i].first, buffer);
+ query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer));
+ if constexpr (Q::EARLY_ABORT) {
+ if (query_results[i].size() > 0) {
+ auto result = Q::merge(query_results, parms);
+ for (size_t i=0; i<states.size(); i++) {
+ Q::delete_query_state(states[i]);
+ }
+
+ Q::delete_buffer_query_state(buffer_state);
+
+ return result;
+ }
+ }
}
// Merge the results together
@@ -264,8 +287,12 @@ private:
return buffer->append(rec, ts);
}
- std::vector<R> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer) {
- std::vector<R> processed_records;
+ std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer) {
+ if constexpr (!Q::SKIP_DELETE_FILTER) {
+ return records;
+ }
+
+ std::vector<Wrapped<R>> processed_records;
processed_records.reserve(records.size());
// For delete tagging, we just need to check the delete bit on each
@@ -276,7 +303,7 @@ private:
continue;
}
- processed_records.emplace_back(rec.rec);
+ processed_records.emplace_back(rec);
}
return processed_records;
@@ -305,7 +332,7 @@ private:
}
}
- processed_records.emplace_back(rec.rec);
+ processed_records.emplace_back(rec);
}
return processed_records;
diff --git a/include/framework/QueryInterface.h b/include/framework/QueryInterface.h
index 886bdc8..46a1ce1 100644
--- a/include/framework/QueryInterface.h
+++ b/include/framework/QueryInterface.h
@@ -8,11 +8,12 @@
*/
#pragma once
+#include <vector>
#include <concepts>
#include "util/types.h"
template <typename Q>
-concept QueryInterface = requires(Q q, void *p) {
+concept QueryInterface = requires(Q q, void *p, std::vector<void*> &s) {
/*
{q.get_query_state(p, p)} -> std::convertible_to<void*>;
@@ -22,6 +23,13 @@ concept QueryInterface = requires(Q q, void *p) {
{q.merge()};
{q.delete_query_state(p)};
*/
+ {Q::EARLY_ABORT} -> std::convertible_to<bool>;
+ {Q::SKIP_DELETE_FILTER} -> std::convertible_to<bool>;
+ //{Q::get_query_state(p, p)} -> std::convertible_to<void*>;
+ //{Q::get_buffer_query_state(p, p)} -> std::convertible_to<void*>;
+ {Q::process_query_states(p, s, p)};
{Q::delete_query_state(std::declval<void*>())} -> std::same_as<void>;
+ {Q::delete_buffer_query_state(p)};
+
};