From 9fe305c7d28e993e55c55427f377ae7e3251ea4f Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Fri, 6 Dec 2024 13:13:51 -0500 Subject: Interface update (#5) * Query Interface Adjustments/Refactoring Began the process of adjusting the query interface (and also the shard interface, to a lesser degree) to better accommodate the user. In particular the following changes have been made, 1. The number of necessary template arguments for the query type has been drastically reduced, while also removing the void pointers and manual delete functions from the interface. This was accomplished by requiring many of the sub-types associated with a query (parameters, etc.) to be nested inside the main query class, and by forcing the SHARD type to expose its associated record type. 2. User-defined query return types are now supported. Queries no longer are required to return strictly sets of records. Instead, the query now has LocalResultType and ResultType template parameters (which can be defaulted using a typedef in the Query type itself), allowing much more flexibility. Note that, at least for the short term, the LocalResultType must still expose the same is_deleted/is_tombstone interface as a Wrapped used to, as this is currently needed for delete filtering. A better approach to this is, hopefully, forthcoming. 3. Updated the ISAMTree.h shard and rangequery.h query to use the new interfaces, and adjusted the associated unit tests as well. 4. Dropped the unnecessary "get_data()" function from the ShardInterface concept. 5. Dropped the need to specify a record type in the ShardInterface concept. This is now handled using a required Shard::RECORD member of the Shard class itself, which should expose the name of the record type. * Updates to framework to support new Query/Shard interfaces Pretty extensive adjustments to the framework, particularly to the templates themselves, along with some type-renaming work, to support the new query and shard interfaces. Adjusted the external query interface to take an rvalue reference, rather than a pointer, to the query parameters. * Removed framework-level delete filtering This was causing some issues with the new query interface, and should probably be reworked anyway, so I'm temporarily (TM) removing the feature. * Updated benchmarks + remaining code for new interface --- include/query/rangequery.h | 283 +++++++++++++++++++++++---------------------- 1 file changed, 146 insertions(+), 137 deletions(-) (limited to 'include/query/rangequery.h') diff --git a/include/query/rangequery.h b/include/query/rangequery.h index e0690e6..e7be39c 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -1,177 +1,186 @@ /* * include/query/rangequery.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * - * A query class for single dimensional range queries. This query requires + * A query class for single dimensional range queries. This query requires * that the shard support get_lower_bound(key) and get_record_at(index). */ #pragma once #include "framework/QueryRequirements.h" +#include "framework/interface/Record.h" #include "psu-ds/PriorityQueue.h" #include "util/Cursor.h" -namespace de { namespace rq { +namespace de { +namespace rq { -template -struct Parms { +template class Query { + typedef typename S::RECORD R; + +public: + struct Parameters { decltype(R::key) lower_bound; decltype(R::key) upper_bound; -}; + }; -template -struct State { + struct LocalQuery { size_t start_idx; size_t stop_idx; -}; + Parameters global_parms; + }; -template -struct BufferState { + struct LocalQueryBuffer { BufferView *buffer; - - BufferState(BufferView *buffer) - : buffer(buffer) {} -}; - -template S> -class Query { -public: - constexpr static bool EARLY_ABORT=false; - constexpr static bool SKIP_DELETE_FILTER=true; - - static void *get_query_state(S *shard, void *parms) { - auto res = new State(); - auto p = (Parms *) parms; - - res->start_idx = shard->get_lower_bound(p->lower_bound); - res->stop_idx = shard->get_record_count(); - - return res; + Parameters global_parms; + }; + + typedef Wrapped LocalResultType; + typedef R ResultType; + + constexpr static bool EARLY_ABORT = false; + constexpr static bool SKIP_DELETE_FILTER = true; + + static LocalQuery *local_preproc(S *shard, Parameters *parms) { + auto query = new LocalQuery(); + + query->start_idx = shard->get_lower_bound(parms->lower_bound); + query->stop_idx = shard->get_record_count(); + query->global_parms = *parms; + + return query; + } + + static LocalQueryBuffer *local_preproc_buffer(BufferView *buffer, + Parameters *parms) { + auto query = new LocalQueryBuffer(); + query->buffer = buffer; + query->global_parms = *parms; + + return query; + } + + static void distribute_query(Parameters *parms, + std::vector const &local_queries, + LocalQueryBuffer *buffer_query) { + return; + } + + static std::vector local_query(S *shard, LocalQuery *query) { + std::vector result; + + /* + * if the returned index is one past the end of the + * records for the PGM, then there are not records + * in the index falling into the specified range. + */ + if (query->start_idx == shard->get_record_count()) { + return result; } - static void* get_buffer_query_state(BufferView *buffer, void *parms) { - auto res = new BufferState(buffer); + auto ptr = shard->get_record_at(query->start_idx); - return res; + /* + * roll the pointer forward to the first record that is + * greater than or equal to the lower bound. + */ + while (ptr < shard->get_data() + query->stop_idx && + ptr->rec.key < query->global_parms.lower_bound) { + ptr++; } - static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_state) { - return; + while (ptr < shard->get_data() + query->stop_idx && + ptr->rec.key <= query->global_parms.upper_bound) { + result.emplace_back(*ptr); + ptr++; } - static std::vector> query(S *shard, void *q_state, void *parms) { - std::vector> records; - auto p = (Parms *) parms; - auto s = (State *) q_state; - - /* - * if the returned index is one past the end of the - * records for the PGM, then there are not records - * in the index falling into the specified range. - */ - if (s->start_idx == shard->get_record_count()) { - return records; - } - - auto ptr = shard->get_record_at(s->start_idx); - - /* - * roll the pointer forward to the first record that is - * greater than or equal to the lower bound. - */ - while(ptr < shard->get_data() + s->stop_idx && ptr->rec.key < p->lower_bound) { - ptr++; - } - - while (ptr < shard->get_data() + s->stop_idx && ptr->rec.key <= p->upper_bound) { - records.emplace_back(*ptr); - ptr++; - } - - return records; - } + return result; + } - static std::vector> buffer_query(void *state, void *parms) { - auto p = (Parms *) parms; - auto s = (BufferState *) state; + static std::vector + local_query_buffer(LocalQueryBuffer *query) { - std::vector> records; - for (size_t i=0; ibuffer->get_record_count(); i++) { - auto rec = s->buffer->get(i); - if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { - records.emplace_back(*rec); - } - } - - return records; + std::vector result; + for (size_t i = 0; i < query->buffer->get_record_count(); i++) { + auto rec = query->buffer->get(i); + if (rec->rec.key >= query->global_parms.lower_bound && + rec->rec.key <= query->global_parms.upper_bound) { + result.emplace_back(*rec); + } } - static std::vector merge(std::vector>> &results, void *parms, std::vector &output) { - std::vector>> cursors; - cursors.reserve(results.size()); - - psudb::PriorityQueue> pq(results.size()); - size_t total = 0; - size_t tmp_n = results.size(); - - - for (size_t i = 0; i < tmp_n; ++i) - if (results[i].size() > 0){ - auto base = results[i].data(); - cursors.emplace_back(Cursor>{base, base + results[i].size(), 0, results[i].size()}); - assert(i == cursors.size() - 1); - total += results[i].size(); - pq.push(cursors[i].ptr, tmp_n - i - 1); - } else { - cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); - } - - if (total == 0) { - return std::vector(); - } - - output.reserve(total); - - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : psudb::queue_record>{nullptr, 0}; - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[tmp_n - now.version - 1]; - auto& cursor2 = cursors[tmp_n - next.version - 1]; - if (advance_cursor>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor>(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[tmp_n - now.version - 1]; - if (!now.data->is_tombstone()) output.push_back(cursor.ptr->rec); - - pq.pop(); - - if (advance_cursor>(cursor)) pq.push(cursor.ptr, now.version); - } - } - - return output; + return result; + } + + static void + combine(std::vector> const &local_results, + Parameters *parms, std::vector &output) { + std::vector> cursors; + cursors.reserve(local_results.size()); + + psudb::PriorityQueue pq(local_results.size()); + size_t total = 0; + size_t tmp_n = local_results.size(); + + for (size_t i = 0; i < tmp_n; ++i) + if (local_results[i].size() > 0) { + auto base = local_results[i].data(); + cursors.emplace_back(Cursor{ + base, base + local_results[i].size(), 0, local_results[i].size()}); + assert(i == cursors.size() - 1); + total += local_results[i].size(); + pq.push(cursors[i].ptr, tmp_n - i - 1); + } else { + cursors.emplace_back(Cursor{nullptr, nullptr, 0, 0}); + } + + if (total == 0) { + return; } - static void delete_query_state(void *state) { - auto s = (State *) state; - delete s; + output.reserve(total); + + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 + ? pq.peek(1) + : psudb::queue_record{nullptr, 0}; + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[tmp_n - now.version - 1]; + auto &cursor2 = cursors[tmp_n - next.version - 1]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[tmp_n - now.version - 1]; + if (!now.data->is_tombstone()) + output.push_back(cursor.ptr->rec); + + pq.pop(); + + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); + } } - static void delete_buffer_query_state(void *state) { - auto s = (BufferState *) state; - delete s; - } + return; + } - static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { - return false; - } + static bool repeat(Parameters *parms, std::vector &output, + std::vector const &local_queries, + LocalQueryBuffer *buffer_query) { + return false; + } }; -}} +} // namespace rq +} // namespace de -- cgit v1.2.3