/* * include/query/rangequery.h * * Copyright (C) 2023 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * * A query class for single dimensional range queries. This query requires * that the shard support get_lower_bound(key) and get_record_at(index). */ #pragma once #include "framework/interface/Record.h" #include "framework/interface/Shard.h" #include "framework/structure/BufferView.h" #include "psu-ds/PriorityQueue.h" #include "util/Cursor.h" namespace de { namespace rq { template struct Parms { decltype(R::key) lower_bound; decltype(R::key) upper_bound; }; template struct State { size_t start_idx; size_t stop_idx; }; template struct BufferState { BufferView *buffer; BufferState(BufferView *buffer) : buffer(buffer) {} }; template class Query { public: constexpr static bool EARLY_ABORT=false; constexpr static bool SKIP_DELETE_FILTER=true; static void *get_query_state(S *shard, void *parms) { auto res = new State(); auto p = (Parms *) parms; res->start_idx = shard->get_lower_bound(p->lower_bound); res->stop_idx = shard->get_record_count(); return res; } static void* get_buffer_query_state(BufferView *buffer, void *parms) { auto res = new BufferState(buffer); return res; } static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_states) { return; } static std::vector> query(S *shard, void *q_state, void *parms) { std::vector> records; auto p = (Parms *) parms; auto s = (State *) q_state; /* * if the returned index is one past the end of the * records for the PGM, then there are not records * in the index falling into the specified range. */ if (s->start_idx == shard->get_record_count()) { return records; } auto ptr = shard->get_record_at(s->start_idx); /* * roll the pointer forward to the first record that is * greater than or equal to the lower bound. */ while(ptr < shard->get_data() + s->stop_idx && ptr->rec.key < p->lower_bound) { ptr++; } while (ptr < shard->get_data() + s->stop_idx && ptr->rec.key <= p->upper_bound) { records.emplace_back(*ptr); ptr++; } return records; } static std::vector> buffer_query(void *state, void *parms) { auto p = (Parms *) parms; auto s = (BufferState *) state; std::vector> records; for (size_t i=0; ibuffer->get_record_count(); i++) { auto rec = s->buffer->get(i); if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { records.emplace_back(*rec); } } return records; } static std::vector merge(std::vector>> &results, void *parms) { std::vector>> cursors; cursors.reserve(results.size()); psudb::PriorityQueue> pq(results.size()); size_t total = 0; size_t tmp_n = results.size(); for (size_t i = 0; i < tmp_n; ++i) if (results[i].size() > 0){ auto base = results[i].data(); cursors.emplace_back(Cursor{base, base + results[i].size(), 0, results[i].size()}); assert(i == cursors.size() - 1); total += results[i].size(); pq.push(cursors[i].ptr, tmp_n - i - 1); } else { cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); } if (total == 0) { return std::vector(); } std::vector output; output.reserve(total); while (pq.size()) { auto now = pq.peek(); auto next = pq.size() > 1 ? pq.peek(1) : psudb::queue_record>{nullptr, 0}; if (!now.data->is_tombstone() && next.data != nullptr && now.data->rec == next.data->rec && next.data->is_tombstone()) { pq.pop(); pq.pop(); auto& cursor1 = cursors[tmp_n - now.version - 1]; auto& cursor2 = cursors[tmp_n - next.version - 1]; if (advance_cursor>(cursor1)) pq.push(cursor1.ptr, now.version); if (advance_cursor>(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto& cursor = cursors[tmp_n - now.version - 1]; if (!now.data->is_tombstone()) output.push_back(cursor.ptr->rec); pq.pop(); if (advance_cursor>(cursor)) pq.push(cursor.ptr, now.version); } } return output; } static void delete_query_state(void *state) { auto s = (State *) state; delete s; } static void delete_buffer_query_state(void *state) { auto s = (BufferState *) state; delete s; } }; }}