summaryrefslogtreecommitdiffstats
path: root/include/query
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-12 14:10:11 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-12 14:10:11 -0500
commitaac0bb661af8fae38d3ce08d6078cb4d9dfcb575 (patch)
tree347e0ce7f7e15f2610039f02b75d47cedf810cd6 /include/query
parentc4514c2e62a711189cf3c914297885d97fb51a09 (diff)
downloaddynamic-extension-aac0bb661af8fae38d3ce08d6078cb4d9dfcb575.tar.gz
Initial integration of new buffering scheme into framework
It isn't working right now (lotsa test failures), but we're to the debugging phase now.
Diffstat (limited to 'include/query')
-rw-r--r--include/query/rangequery.h34
1 files changed, 20 insertions, 14 deletions
diff --git a/include/query/rangequery.h b/include/query/rangequery.h
index 16dcd86..ad5b767 100644
--- a/include/query/rangequery.h
+++ b/include/query/rangequery.h
@@ -12,7 +12,7 @@
#include "framework/interface/Record.h"
#include "framework/interface/Shard.h"
-#include "framework/structure/MutableBuffer.h"
+#include "framework/structure/BufferView.h"
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
@@ -32,7 +32,10 @@ struct State {
template <RecordInterface R>
struct BufferState {
- size_t cutoff;
+ BufferView<R> buffer;
+
+ BufferState(BufferView<R> buffer)
+ : buffer(std::move(buffer)) {}
};
template <ShardInterface S, RecordInterface R>
@@ -51,14 +54,13 @@ public:
return res;
}
- static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) {
- auto res = new BufferState<R>();
- res->cutoff = buffer->get_record_count();
+ static void* get_buffer_query_state(BufferView<R> buffer, void *parms) {
+ auto res = new BufferState<R>(std::move(buffer));
return res;
}
- static void process_query_states(void *query_parms, std::vector<void*> &shard_states, std::vector<void*> &buffer_states) {
+ static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void* buffer_states) {
return;
}
@@ -67,17 +69,21 @@ public:
auto p = (Parms<R> *) parms;
auto s = (State<R> *) q_state;
- // if the returned index is one past the end of the
- // records for the PGM, then there are not records
- // in the index falling into the specified range.
+ /*
+ * if the returned index is one past the end of the
+ * records for the PGM, then there are not records
+ * in the index falling into the specified range.
+ */
if (s->start_idx == shard->get_record_count()) {
return records;
}
auto ptr = shard->get_record_at(s->start_idx);
- // roll the pointer forward to the first record that is
- // greater than or equal to the lower bound.
+ /*
+ * roll the pointer forward to the first record that is
+ * greater than or equal to the lower bound.
+ */
while(ptr->rec.key < p->lower_bound) {
ptr++;
}
@@ -90,13 +96,13 @@ public:
return records;
}
- static std::vector<Wrapped<R>> buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) {
+ static std::vector<Wrapped<R>> buffer_query(void *state, void *parms) {
auto p = (Parms<R> *) parms;
auto s = (BufferState<R> *) state;
std::vector<Wrapped<R>> records;
- for (size_t i=0; i<s->cutoff; i++) {
- auto rec = buffer->get_data() + i;
+ for (size_t i=0; i<s->buffer.get_record_count(); i++) {
+ auto rec = s->buffer.get(i);
if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) {
records.emplace_back(*rec);
}