From 0b723322a611de83872dd83b55d2e10e8886a283 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Thu, 2 Nov 2023 08:01:45 -0400 Subject: started refactoring queries interface --- include/query/rangequery.h | 161 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 include/query/rangequery.h (limited to 'include/query/rangequery.h') diff --git a/include/query/rangequery.h b/include/query/rangequery.h new file mode 100644 index 0000000..f9a34d9 --- /dev/null +++ b/include/query/rangequery.h @@ -0,0 +1,161 @@ +/* + * include/query/rangequery.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +namespace de { namespace rq { + +template +struct Parms { + decltype(R::key) lower_bound; + decltype(R::key) upper_bound; +}; + +template +struct State { + size_t start_idx; + size_t stop_idx; +}; + +template +struct BufferState { + size_t cutoff; +}; + +template +class Query { +public: + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=true; + + static void *get_query_state(S *shard, void *parms) { + auto res = new State(); + auto p = (Parms *) parms; + + res->start_idx = shard->get_lower_bound(p->lower_bound); + res->stop_idx = shard->get_record_count(); + + return res; + } + + static void* get_buffer_query_state(MutableBuffer *buffer, void *parms) { + auto res = new BufferState(); + res->cutoff = buffer->get_record_count(); + + return res; + } + + static void process_query_states(void *query_parms, std::vector &shard_states, std::vector &buffer_states) { + return; + } + + static std::vector> query(S *shard, void *q_state, void *parms) { + std::vector> records; + auto p = (Parms *) parms; + auto s = (State *) q_state; + + // if the returned index is one past the end of the + // records for the PGM, then there are not records + // in the index falling into the specified range. + if (s->start_idx == shard->get_record_count()) { + return records; + } + + auto ptr = shard->get_record_at(s->start_idx); + + // roll the pointer forward to the first record that is + // greater than or equal to the lower bound. + while(ptr->rec.key < p->lower_bound) { + ptr++; + } + + while (ptr->rec.key <= p->upper_bound && ptr < shard->m_data + s->stop_idx) { + records.emplace_back(*ptr); + ptr++; + } + + return records; + } + + static std::vector> buffer_query(MutableBuffer *buffer, void *state, void *parms) { + auto p = (Parms *) parms; + auto s = (BufferState *) state; + + std::vector> records; + for (size_t i=0; icutoff; i++) { + auto rec = buffer->get_data() + i; + if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { + records.emplace_back(*rec); + } + } + + return records; + } + + static std::vector merge(std::vector>> &results, void *parms) { + std::vector>> cursors; + cursors.reserve(results.size()); + + PriorityQueue> pq(results.size()); + size_t total = 0; + size_t tmp_n = results.size(); + + + for (size_t i = 0; i < tmp_n; ++i) + if (results[i].size() > 0){ + auto base = results[i].data(); + cursors.emplace_back(Cursor{base, base + results[i].size(), 0, results[i].size()}); + assert(i == cursors.size() - 1); + total += results[i].size(); + pq.push(cursors[i].ptr, tmp_n - i - 1); + } else { + cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); + } + + if (total == 0) { + return std::vector(); + } + + std::vector output; + output.reserve(total); + + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[tmp_n - now.version - 1]; + auto& cursor2 = cursors[tmp_n - next.version - 1]; + if (advance_cursor>(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor>(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[tmp_n - now.version - 1]; + if (!now.data->is_tombstone()) output.push_back(cursor.ptr->rec); + pq.pop(); + + if (advance_cursor>(cursor)) pq.push(cursor.ptr, now.version); + } + } + + return output; + } + + static void delete_query_state(void *state) { + auto s = (State *) state; + delete s; + } + + static void delete_buffer_query_state(void *state) { + auto s = (BufferState *) state; + delete s; + } +}; + +}} -- cgit v1.2.3 From e02742b07540dd5a9bcbb44dae14856bf10955ed Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 15:18:53 -0500 Subject: Refactoring progress --- include/query/rangequery.h | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) (limited to 'include/query/rangequery.h') diff --git a/include/query/rangequery.h b/include/query/rangequery.h index f9a34d9..b9ac9db 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -8,6 +8,12 @@ */ #pragma once +#include "framework/interface/Record.h" +#include "framework/interface/Shard.h" +#include "framework/structure/MutableBuffer.h" +#include "psu-ds/PriorityQueue.h" +#include "util/Cursor.h" + namespace de { namespace rq { template @@ -27,7 +33,7 @@ struct BufferState { size_t cutoff; }; -template +template class Query { public: constexpr static bool EARLY_ABORT=false; @@ -74,7 +80,7 @@ public: ptr++; } - while (ptr->rec.key <= p->upper_bound && ptr < shard->m_data + s->stop_idx) { + while (ptr->rec.key <= p->upper_bound && ptr < shard->get_data() + s->stop_idx) { records.emplace_back(*ptr); ptr++; } @@ -101,7 +107,7 @@ public: std::vector>> cursors; cursors.reserve(results.size()); - PriorityQueue> pq(results.size()); + psudb::PriorityQueue> pq(results.size()); size_t total = 0; size_t tmp_n = results.size(); @@ -126,7 +132,7 @@ public: while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) : psudb::queue_record>{nullptr, 0}; if (!now.data->is_tombstone() && next.data != nullptr && now.data->rec == next.data->rec && next.data->is_tombstone()) { -- cgit v1.2.3