From 0b723322a611de83872dd83b55d2e10e8886a283 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Thu, 2 Nov 2023 08:01:45 -0400 Subject: started refactoring queries interface --- include/query/rangequery.h | 161 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 include/query/rangequery.h (limited to 'include/query/rangequery.h') diff --git a/include/query/rangequery.h b/include/query/rangequery.h new file mode 100644 index 0000000..f9a34d9 --- /dev/null +++ b/include/query/rangequery.h @@ -0,0 +1,161 @@ +/* + * include/query/rangequery.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +namespace de { namespace rq { + +template +struct Parms { + decltype(R::key) lower_bound; + decltype(R::key) upper_bound; +}; + +template +struct State { + size_t start_idx; + size_t stop_idx; +}; + +template +struct BufferState { + size_t cutoff; +}; + +template +class Query { +public: + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=true; + + static void *get_query_state(S *shard, void *parms) { + auto res = new State(); + auto p = (Parms *) parms; + + res->start_idx = shard->get_lower_bound(p->lower_bound); + res->stop_idx = shard->get_record_count(); + + return res; + } + + static void* get_buffer_query_state(MutableBuffer *buffer, void *parms) { + auto res = new BufferState(); + res->cutoff = buffer->get_record_count(); + + return res; + } + + static void process_query_states(void *query_parms, std::vector &shard_states, std::vector &buffer_states) { + return; + } + + static std::vector> query(S *shard, void *q_state, void *parms) { + std::vector> records; + auto p = (Parms *) parms; + auto s = (State *) q_state; + + // if the returned index is one past the end of the + // records for the PGM, then there are not records + // in the index falling into the specified range. + if (s->start_idx == shard->get_record_count()) { + return records; + } + + auto ptr = shard->get_record_at(s->start_idx); + + // roll the pointer forward to the first record that is + // greater than or equal to the lower bound. + while(ptr->rec.key < p->lower_bound) { + ptr++; + } + + while (ptr->rec.key <= p->upper_bound && ptr < shard->m_data + s->stop_idx) { + records.emplace_back(*ptr); + ptr++; + } + + return records; + } + + static std::vector> buffer_query(MutableBuffer *buffer, void *state, void *parms) { + auto p = (Parms *) parms; + auto s = (BufferState *) state; + + std::vector> records; + for (size_t i=0; icutoff; i++) { + auto rec = buffer->get_data() + i; + if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { + records.emplace_back(*rec); + } + } + + return records; + } + + static std::vector merge(std::vector>> &results, void *parms) { + std::vector>> cursors; + cursors.reserve(results.size()); + + PriorityQueue> pq(results.size()); + size_t total = 0; + size_t tmp_n = results.size(); + + + for (size_t i = 0; i < tmp_n; ++i) + if (results[i].size() > 0){ + auto base = results[i].data(); + cursors.emplace_back(Cursor{base, base + results[i].size(), 0, results[i].size()}); + assert(i == cursors.size() - 1); + total += results[i].size(); + pq.push(cursors[i].ptr, tmp_n - i - 1); + } else { + cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); + } + + if (total == 0) { + return std::vector(); + } + + std::vector output; + output.reserve(total); + + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[tmp_n - now.version - 1]; + auto& cursor2 = cursors[tmp_n - next.version - 1]; + if (advance_cursor>(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor>(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[tmp_n - now.version - 1]; + if (!now.data->is_tombstone()) output.push_back(cursor.ptr->rec); + pq.pop(); + + if (advance_cursor>(cursor)) pq.push(cursor.ptr, now.version); + } + } + + return output; + } + + static void delete_query_state(void *state) { + auto s = (State *) state; + delete s; + } + + static void delete_buffer_query_state(void *state) { + auto s = (BufferState *) state; + delete s; + } +}; + +}} -- cgit v1.2.3 From e02742b07540dd5a9bcbb44dae14856bf10955ed Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 15:18:53 -0500 Subject: Refactoring progress --- include/query/rangequery.h | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) (limited to 'include/query/rangequery.h') diff --git a/include/query/rangequery.h b/include/query/rangequery.h index f9a34d9..b9ac9db 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -8,6 +8,12 @@ */ #pragma once +#include "framework/interface/Record.h" +#include "framework/interface/Shard.h" +#include "framework/structure/MutableBuffer.h" +#include "psu-ds/PriorityQueue.h" +#include "util/Cursor.h" + namespace de { namespace rq { template @@ -27,7 +33,7 @@ struct BufferState { size_t cutoff; }; -template +template class Query { public: constexpr static bool EARLY_ABORT=false; @@ -74,7 +80,7 @@ public: ptr++; } - while (ptr->rec.key <= p->upper_bound && ptr < shard->m_data + s->stop_idx) { + while (ptr->rec.key <= p->upper_bound && ptr < shard->get_data() + s->stop_idx) { records.emplace_back(*ptr); ptr++; } @@ -101,7 +107,7 @@ public: std::vector>> cursors; cursors.reserve(results.size()); - PriorityQueue> pq(results.size()); + psudb::PriorityQueue> pq(results.size()); size_t total = 0; size_t tmp_n = results.size(); @@ -126,7 +132,7 @@ public: while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) : psudb::queue_record>{nullptr, 0}; if (!now.data->is_tombstone() && next.data != nullptr && now.data->rec == next.data->rec && next.data->is_tombstone()) { -- cgit v1.2.3 From 357cab549c2ed33970562b84ff6f83923742343d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 7 Nov 2023 15:34:24 -0500 Subject: Comment and License updates --- include/query/rangequery.h | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'include/query/rangequery.h') diff --git a/include/query/rangequery.h b/include/query/rangequery.h index b9ac9db..16dcd86 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -1,10 +1,12 @@ /* * include/query/rangequery.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * - * All rights reserved. Published under the Modified BSD License. + * Distributed under the Modified BSD License. * + * A query class for single dimensional range queries. This query requires + * that the shard support get_lower_bound(key) and get_record_at(index). */ #pragma once -- cgit v1.2.3 From aac0bb661af8fae38d3ce08d6078cb4d9dfcb575 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 12 Jan 2024 14:10:11 -0500 Subject: Initial integration of new buffering scheme into framework It isn't working right now (lotsa test failures), but we're to the debugging phase now. --- include/query/rangequery.h | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) (limited to 'include/query/rangequery.h') diff --git a/include/query/rangequery.h b/include/query/rangequery.h index 16dcd86..ad5b767 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -12,7 +12,7 @@ #include "framework/interface/Record.h" #include "framework/interface/Shard.h" -#include "framework/structure/MutableBuffer.h" +#include "framework/structure/BufferView.h" #include "psu-ds/PriorityQueue.h" #include "util/Cursor.h" @@ -32,7 +32,10 @@ struct State { template struct BufferState { - size_t cutoff; + BufferView buffer; + + BufferState(BufferView buffer) + : buffer(std::move(buffer)) {} }; template @@ -51,14 +54,13 @@ public: return res; } - static void* get_buffer_query_state(MutableBuffer *buffer, void *parms) { - auto res = new BufferState(); - res->cutoff = buffer->get_record_count(); + static void* get_buffer_query_state(BufferView buffer, void *parms) { + auto res = new BufferState(std::move(buffer)); return res; } - static void process_query_states(void *query_parms, std::vector &shard_states, std::vector &buffer_states) { + static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_states) { return; } @@ -67,17 +69,21 @@ public: auto p = (Parms *) parms; auto s = (State *) q_state; - // if the returned index is one past the end of the - // records for the PGM, then there are not records - // in the index falling into the specified range. + /* + * if the returned index is one past the end of the + * records for the PGM, then there are not records + * in the index falling into the specified range. + */ if (s->start_idx == shard->get_record_count()) { return records; } auto ptr = shard->get_record_at(s->start_idx); - // roll the pointer forward to the first record that is - // greater than or equal to the lower bound. + /* + * roll the pointer forward to the first record that is + * greater than or equal to the lower bound. + */ while(ptr->rec.key < p->lower_bound) { ptr++; } @@ -90,13 +96,13 @@ public: return records; } - static std::vector> buffer_query(MutableBuffer *buffer, void *state, void *parms) { + static std::vector> buffer_query(void *state, void *parms) { auto p = (Parms *) parms; auto s = (BufferState *) state; std::vector> records; - for (size_t i=0; icutoff; i++) { - auto rec = buffer->get_data() + i; + for (size_t i=0; ibuffer.get_record_count(); i++) { + auto rec = s->buffer.get(i); if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { records.emplace_back(*rec); } -- cgit v1.2.3 From 138c793b0a58577713d98c98bb140cf1d9c79bee Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 17 Jan 2024 18:22:00 -0500 Subject: Multiple concurrency bug fixes A poorly organized commit with fixes for a variety of bugs that were causing missing records. The core problems all appear to be fixed, though there is an outstanding problem with tombstones not being completely canceled. A very small number are appearing in the wrong order during the static structure test. --- include/query/rangequery.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'include/query/rangequery.h') diff --git a/include/query/rangequery.h b/include/query/rangequery.h index ad5b767..c44f5d7 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -84,11 +84,11 @@ public: * roll the pointer forward to the first record that is * greater than or equal to the lower bound. */ - while(ptr->rec.key < p->lower_bound) { + while(ptr < shard->get_data() + s->stop_idx && ptr->rec.key < p->lower_bound) { ptr++; } - while (ptr->rec.key <= p->upper_bound && ptr < shard->get_data() + s->stop_idx) { + while (ptr < shard->get_data() + s->stop_idx && ptr->rec.key <= p->upper_bound) { records.emplace_back(*ptr); ptr++; } @@ -152,6 +152,7 @@ public: } else { auto& cursor = cursors[tmp_n - now.version - 1]; if (!now.data->is_tombstone()) output.push_back(cursor.ptr->rec); + pq.pop(); if (advance_cursor>(cursor)) pq.push(cursor.ptr, now.version); -- cgit v1.2.3 From db4806d9dd9757273a14e6c3ea92e5a087239145 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 5 Feb 2024 15:17:25 -0500 Subject: Set up tombstone deletes properly --- include/query/rangequery.h | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'include/query/rangequery.h') diff --git a/include/query/rangequery.h b/include/query/rangequery.h index c44f5d7..1a42265 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -32,10 +32,10 @@ struct State { template struct BufferState { - BufferView buffer; + BufferView *buffer; - BufferState(BufferView buffer) - : buffer(std::move(buffer)) {} + BufferState(BufferView *buffer) + : buffer(buffer) {} }; template @@ -54,8 +54,8 @@ public: return res; } - static void* get_buffer_query_state(BufferView buffer, void *parms) { - auto res = new BufferState(std::move(buffer)); + static void* get_buffer_query_state(BufferView *buffer, void *parms) { + auto res = new BufferState(buffer); return res; } @@ -101,8 +101,8 @@ public: auto s = (BufferState *) state; std::vector> records; - for (size_t i=0; ibuffer.get_record_count(); i++) { - auto rec = s->buffer.get(i); + for (size_t i=0; ibuffer->get_record_count(); i++) { + auto rec = s->buffer->get(i); if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { records.emplace_back(*rec); } -- cgit v1.2.3 From 10b4425e842d10b7fbfa85978969ed4591d6b98e Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 7 Feb 2024 10:56:52 -0500 Subject: Fully implemented Query concept and adjusted queries to use it --- include/query/rangequery.h | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'include/query/rangequery.h') diff --git a/include/query/rangequery.h b/include/query/rangequery.h index 1a42265..c3985fa 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -10,9 +10,7 @@ */ #pragma once -#include "framework/interface/Record.h" -#include "framework/interface/Shard.h" -#include "framework/structure/BufferView.h" +#include "framework/QueryRequirements.h" #include "psu-ds/PriorityQueue.h" #include "util/Cursor.h" @@ -60,7 +58,7 @@ public: return res; } - static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_states) { + static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_state) { return; } -- cgit v1.2.3 From 2c5d549b3618b9ea72e6eece4cb4f3da5a6811a8 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 7 Feb 2024 13:42:34 -0500 Subject: Fully realized shard concept interface --- include/query/rangequery.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include/query/rangequery.h') diff --git a/include/query/rangequery.h b/include/query/rangequery.h index c3985fa..24b38ec 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -36,7 +36,7 @@ struct BufferState { : buffer(buffer) {} }; -template +template S> class Query { public: constexpr static bool EARLY_ABORT=false; -- cgit v1.2.3