diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-07-27 18:21:26 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-07-27 18:21:26 -0400 |
| commit | d6e08e9d8d3ac9b356ac50cee22b41f828160247 (patch) | |
| tree | 2c1f62f6385072e1a824e557693634acc3136cdd /include/framework/DynamicExtension.h | |
| parent | f462921cbc52688fb478c6ac86a891e596fd6053 (diff) | |
| download | dynamic-extension-d6e08e9d8d3ac9b356ac50cee22b41f828160247.tar.gz | |
Expanded query interface
Query interface now enables skipping of delete processing and stopping
query processing when first match is found.
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 41 |
1 files changed, 34 insertions, 7 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index bd09e1f..5c903b9 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -121,16 +121,39 @@ public: Q::process_query_states(parms, states, buffer_state); - std::vector<std::vector<R>> query_results(shards.size() + 1); + std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); // Execute the query for the buffer auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); - query_results[0] = filter_deletes(buffer_results, {-1, -1}, buffer); + query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer)); + if constexpr (Q::EARLY_ABORT) { + if (query_results[0].size() > 0) { + auto result = Q::merge(query_results, parms); + for (size_t i=0; i<states.size(); i++) { + Q::delete_query_state(states[i]); + } + + Q::delete_buffer_query_state(buffer_state); + return result; + } + } // Execute the query for each shard for (size_t i=0; i<shards.size(); i++) { auto shard_results = Q::query(shards[i].second, states[i], parms); - query_results[i+1] = filter_deletes(shard_results, shards[i].first, buffer); + query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer)); + if constexpr (Q::EARLY_ABORT) { + if (query_results[i].size() > 0) { + auto result = Q::merge(query_results, parms); + for (size_t i=0; i<states.size(); i++) { + Q::delete_query_state(states[i]); + } + + Q::delete_buffer_query_state(buffer_state); + + return result; + } + } } // Merge the results together @@ -264,8 +287,12 @@ private: return buffer->append(rec, ts); } - std::vector<R> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer) { - std::vector<R> processed_records; + std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer) { + if constexpr (!Q::SKIP_DELETE_FILTER) { + return records; + } + + std::vector<Wrapped<R>> processed_records; processed_records.reserve(records.size()); // For delete tagging, we just need to check the delete bit on each @@ -276,7 +303,7 @@ private: continue; } - processed_records.emplace_back(rec.rec); + processed_records.emplace_back(rec); } return processed_records; @@ -305,7 +332,7 @@ private: } } - processed_records.emplace_back(rec.rec); + processed_records.emplace_back(rec); } return processed_records; |