summaryrefslogtreecommitdiffstats
path: root/include/query/rangequery.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/query/rangequery.h')
-rw-r--r--include/query/rangequery.h283
1 files changed, 146 insertions, 137 deletions
diff --git a/include/query/rangequery.h b/include/query/rangequery.h
index e0690e6..e7be39c 100644
--- a/include/query/rangequery.h
+++ b/include/query/rangequery.h
@@ -1,177 +1,186 @@
/*
* include/query/rangequery.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
- * A query class for single dimensional range queries. This query requires
+ * A query class for single dimensional range queries. This query requires
* that the shard support get_lower_bound(key) and get_record_at(index).
*/
#pragma once
#include "framework/QueryRequirements.h"
+#include "framework/interface/Record.h"
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
-namespace de { namespace rq {
+namespace de {
+namespace rq {
-template <RecordInterface R>
-struct Parms {
+template <ShardInterface S> class Query {
+ typedef typename S::RECORD R;
+
+public:
+ struct Parameters {
decltype(R::key) lower_bound;
decltype(R::key) upper_bound;
-};
+ };
-template <RecordInterface R>
-struct State {
+ struct LocalQuery {
size_t start_idx;
size_t stop_idx;
-};
+ Parameters global_parms;
+ };
-template <RecordInterface R>
-struct BufferState {
+ struct LocalQueryBuffer {
BufferView<R> *buffer;
-
- BufferState(BufferView<R> *buffer)
- : buffer(buffer) {}
-};
-
-template <RecordInterface R, ShardInterface<R> S>
-class Query {
-public:
- constexpr static bool EARLY_ABORT=false;
- constexpr static bool SKIP_DELETE_FILTER=true;
-
- static void *get_query_state(S *shard, void *parms) {
- auto res = new State<R>();
- auto p = (Parms<R> *) parms;
-
- res->start_idx = shard->get_lower_bound(p->lower_bound);
- res->stop_idx = shard->get_record_count();
-
- return res;
+ Parameters global_parms;
+ };
+
+ typedef Wrapped<R> LocalResultType;
+ typedef R ResultType;
+
+ constexpr static bool EARLY_ABORT = false;
+ constexpr static bool SKIP_DELETE_FILTER = true;
+
+ static LocalQuery *local_preproc(S *shard, Parameters *parms) {
+ auto query = new LocalQuery();
+
+ query->start_idx = shard->get_lower_bound(parms->lower_bound);
+ query->stop_idx = shard->get_record_count();
+ query->global_parms = *parms;
+
+ return query;
+ }
+
+ static LocalQueryBuffer *local_preproc_buffer(BufferView<R> *buffer,
+ Parameters *parms) {
+ auto query = new LocalQueryBuffer();
+ query->buffer = buffer;
+ query->global_parms = *parms;
+
+ return query;
+ }
+
+ static void distribute_query(Parameters *parms,
+ std::vector<LocalQuery *> const &local_queries,
+ LocalQueryBuffer *buffer_query) {
+ return;
+ }
+
+ static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
+ std::vector<LocalResultType> result;
+
+ /*
+ * if the returned index is one past the end of the
+ * records for the PGM, then there are not records
+ * in the index falling into the specified range.
+ */
+ if (query->start_idx == shard->get_record_count()) {
+ return result;
}
- static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) {
- auto res = new BufferState<R>(buffer);
+ auto ptr = shard->get_record_at(query->start_idx);
- return res;
+ /*
+ * roll the pointer forward to the first record that is
+ * greater than or equal to the lower bound.
+ */
+ while (ptr < shard->get_data() + query->stop_idx &&
+ ptr->rec.key < query->global_parms.lower_bound) {
+ ptr++;
}
- static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void* buffer_state) {
- return;
+ while (ptr < shard->get_data() + query->stop_idx &&
+ ptr->rec.key <= query->global_parms.upper_bound) {
+ result.emplace_back(*ptr);
+ ptr++;
}
- static std::vector<Wrapped<R>> query(S *shard, void *q_state, void *parms) {
- std::vector<Wrapped<R>> records;
- auto p = (Parms<R> *) parms;
- auto s = (State<R> *) q_state;
-
- /*
- * if the returned index is one past the end of the
- * records for the PGM, then there are not records
- * in the index falling into the specified range.
- */
- if (s->start_idx == shard->get_record_count()) {
- return records;
- }
-
- auto ptr = shard->get_record_at(s->start_idx);
-
- /*
- * roll the pointer forward to the first record that is
- * greater than or equal to the lower bound.
- */
- while(ptr < shard->get_data() + s->stop_idx && ptr->rec.key < p->lower_bound) {
- ptr++;
- }
-
- while (ptr < shard->get_data() + s->stop_idx && ptr->rec.key <= p->upper_bound) {
- records.emplace_back(*ptr);
- ptr++;
- }
-
- return records;
- }
+ return result;
+ }
- static std::vector<Wrapped<R>> buffer_query(void *state, void *parms) {
- auto p = (Parms<R> *) parms;
- auto s = (BufferState<R> *) state;
+ static std::vector<LocalResultType>
+ local_query_buffer(LocalQueryBuffer *query) {
- std::vector<Wrapped<R>> records;
- for (size_t i=0; i<s->buffer->get_record_count(); i++) {
- auto rec = s->buffer->get(i);
- if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) {
- records.emplace_back(*rec);
- }
- }
-
- return records;
+ std::vector<LocalResultType> result;
+ for (size_t i = 0; i < query->buffer->get_record_count(); i++) {
+ auto rec = query->buffer->get(i);
+ if (rec->rec.key >= query->global_parms.lower_bound &&
+ rec->rec.key <= query->global_parms.upper_bound) {
+ result.emplace_back(*rec);
+ }
}
- static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) {
- std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(results.size());
-
- psudb::PriorityQueue<Wrapped<R>> pq(results.size());
- size_t total = 0;
- size_t tmp_n = results.size();
-
-
- for (size_t i = 0; i < tmp_n; ++i)
- if (results[i].size() > 0){
- auto base = results[i].data();
- cursors.emplace_back(Cursor<Wrapped<R>>{base, base + results[i].size(), 0, results[i].size()});
- assert(i == cursors.size() - 1);
- total += results[i].size();
- pq.push(cursors[i].ptr, tmp_n - i - 1);
- } else {
- cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
- }
-
- if (total == 0) {
- return std::vector<R>();
- }
-
- output.reserve(total);
-
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : psudb::queue_record<Wrapped<R>>{nullptr, 0};
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[tmp_n - now.version - 1];
- auto& cursor2 = cursors[tmp_n - next.version - 1];
- if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[tmp_n - now.version - 1];
- if (!now.data->is_tombstone()) output.push_back(cursor.ptr->rec);
-
- pq.pop();
-
- if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
-
- return output;
+ return result;
+ }
+
+ static void
+ combine(std::vector<std::vector<LocalResultType>> const &local_results,
+ Parameters *parms, std::vector<ResultType> &output) {
+ std::vector<Cursor<LocalResultType>> cursors;
+ cursors.reserve(local_results.size());
+
+ psudb::PriorityQueue<LocalResultType> pq(local_results.size());
+ size_t total = 0;
+ size_t tmp_n = local_results.size();
+
+ for (size_t i = 0; i < tmp_n; ++i)
+ if (local_results[i].size() > 0) {
+ auto base = local_results[i].data();
+ cursors.emplace_back(Cursor<LocalResultType>{
+ base, base + local_results[i].size(), 0, local_results[i].size()});
+ assert(i == cursors.size() - 1);
+ total += local_results[i].size();
+ pq.push(cursors[i].ptr, tmp_n - i - 1);
+ } else {
+ cursors.emplace_back(Cursor<LocalResultType>{nullptr, nullptr, 0, 0});
+ }
+
+ if (total == 0) {
+ return;
}
- static void delete_query_state(void *state) {
- auto s = (State<R> *) state;
- delete s;
+ output.reserve(total);
+
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next = pq.size() > 1
+ ? pq.peek(1)
+ : psudb::queue_record<LocalResultType>{nullptr, 0};
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop();
+ pq.pop();
+ auto &cursor1 = cursors[tmp_n - now.version - 1];
+ auto &cursor2 = cursors[tmp_n - next.version - 1];
+ if (advance_cursor<LocalResultType>(cursor1))
+ pq.push(cursor1.ptr, now.version);
+ if (advance_cursor<LocalResultType>(cursor2))
+ pq.push(cursor2.ptr, next.version);
+ } else {
+ auto &cursor = cursors[tmp_n - now.version - 1];
+ if (!now.data->is_tombstone())
+ output.push_back(cursor.ptr->rec);
+
+ pq.pop();
+
+ if (advance_cursor<LocalResultType>(cursor))
+ pq.push(cursor.ptr, now.version);
+ }
}
- static void delete_buffer_query_state(void *state) {
- auto s = (BufferState<R> *) state;
- delete s;
- }
+ return;
+ }
- static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) {
- return false;
- }
+ static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ std::vector<LocalQuery *> const &local_queries,
+ LocalQueryBuffer *buffer_query) {
+ return false;
+ }
};
-}}
+} // namespace rq
+} // namespace de