From 10b4425e842d10b7fbfa85978969ed4591d6b98e Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 7 Feb 2024 10:56:52 -0500 Subject: Fully implemented Query concept and adjusted queries to use it --- include/framework/DynamicExtension.h | 2 +- include/framework/interface/Query.h | 25 ++++++++++-------------- include/framework/scheduling/Epoch.h | 2 +- include/framework/scheduling/Task.h | 4 ++-- include/framework/structure/ExtensionStructure.h | 2 +- include/framework/structure/InternalLevel.h | 4 ++-- include/query/irs.h | 4 ++-- include/query/rangecount.h | 8 ++------ include/query/rangequery.h | 6 ++---- include/query/wirs.h | 19 +++++++++--------- include/query/wss.h | 20 ++++++++++--------- 11 files changed, 44 insertions(+), 52 deletions(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 3e9d0fb..5c021f2 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -31,7 +31,7 @@ namespace de { -template Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler> class DynamicExtension { typedef S Shard; diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h index ca742c3..8cf9660 100644 --- a/include/framework/interface/Query.h +++ b/include/framework/interface/Query.h @@ -8,31 +8,26 @@ */ #pragma once -#include +#include "framework/QueryRequirements.h" #include -#include "util/types.h" - +namespace de{ // FIXME: The interface is not completely specified yet, as it is pending // determining a good way to handle additional template arguments // to get the Shard and Record types into play -template -concept QueryInterface = requires(Q q, void *p, std::vector &s) { - - /* - {Q::get_query_state(p, p)} -> std::convertible_to; - {Q::get_buffer_query_state(p, p)} -> std::convertible_to; - */ +template +concept QueryInterface = requires(void *p, S *sh, std::vector &s, std::vector>> &rv, BufferView *bv) { + {Q::get_query_state(sh, p)} -> std::convertible_to; + {Q::get_buffer_query_state(bv, p)} -> std::convertible_to; {Q::process_query_states(p, s, p)}; - /* - {Q::query(s, p, p)} -> std::convertible_to>>; + {Q::query(sh, p, p)} -> std::convertible_to>>; {Q::buffer_query(p, p)} -> std::convertible_to>>; {Q::merge(rv, p)} -> std::convertible_to>; - */ - {Q::delete_query_state(std::declval())} -> std::same_as; - {Q::delete_buffer_query_state(std::declval())} -> std::same_as; + {Q::delete_query_state(p)} -> std::same_as; + {Q::delete_buffer_query_state(p)} -> std::same_as; {Q::EARLY_ABORT} -> std::convertible_to; {Q::SKIP_DELETE_FILTER} -> std::convertible_to; }; +} diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 7b533b6..48b7742 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -18,7 +18,7 @@ namespace de { -template +template Q, LayoutPolicy L> class Epoch { private: typedef MutableBuffer Buffer; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 6f6b913..ba0001d 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -18,7 +18,7 @@ namespace de { -template +template Q, LayoutPolicy L> struct ReconstructionArgs { Epoch *epoch; std::vector merges; @@ -27,7 +27,7 @@ struct ReconstructionArgs { void *extension; }; -template +template Q, LayoutPolicy L> struct QueryArgs { std::promise> result_set; void *query_parms; diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index ae566cb..0b8263e 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -22,7 +22,7 @@ namespace de { -template +template Q, LayoutPolicy L=LayoutPolicy::TEIRING> class ExtensionStructure { typedef S Shard; typedef BufferView BuffView; diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index e9874e0..0fd5275 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -19,12 +19,12 @@ #include "framework/structure/BufferView.h" namespace de { -template +template Q> class InternalLevel; -template +template Q> class InternalLevel { typedef S Shard; typedef BufferView BuffView; diff --git a/include/query/irs.h b/include/query/irs.h index 7eea14b..bef75bf 100644 --- a/include/query/irs.h +++ b/include/query/irs.h @@ -90,9 +90,9 @@ public: return res; } - static void process_query_states(void *query_parms, std::vector &shard_states, void *buff_state) { + static void process_query_states(void *query_parms, std::vector &shard_states, void *buffer_state) { auto p = (Parms *) query_parms; - auto bs = (buff_state) ? (BufferState *) buff_state : nullptr; + auto bs = (buffer_state) ? (BufferState *) buffer_state : nullptr; std::vector shard_sample_sizes(shard_states.size()+1, 0); size_t buffer_sz = 0; diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 70d57d8..a09ad64 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -11,11 +11,7 @@ */ #pragma once -#include "framework/interface/Record.h" -#include "framework/interface/Shard.h" -#include "framework/structure/BufferView.h" -#include "psu-ds/PriorityQueue.h" -#include "util/Cursor.h" +#include "framework/QueryRequirements.h" namespace de { namespace rc { @@ -61,7 +57,7 @@ public: return res; } - static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_states) { + static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_state) { return; } diff --git a/include/query/rangequery.h b/include/query/rangequery.h index 1a42265..c3985fa 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -10,9 +10,7 @@ */ #pragma once -#include "framework/interface/Record.h" -#include "framework/interface/Shard.h" -#include "framework/structure/BufferView.h" +#include "framework/QueryRequirements.h" #include "psu-ds/PriorityQueue.h" #include "util/Cursor.h" @@ -60,7 +58,7 @@ public: return res; } - static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_states) { + static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_state) { return; } diff --git a/include/query/wirs.h b/include/query/wirs.h index 9b3d2ad..07c5292 100644 --- a/include/query/wirs.h +++ b/include/query/wirs.h @@ -12,9 +12,7 @@ */ #pragma once -#include "framework/interface/Record.h" -#include "framework/interface/Shard.h" -#include "framework/structure/MutableBuffer.h" +#include "framework/QueryRequirements.h" #include "psu-ds/Alias.h" namespace de { namespace wirs { @@ -52,6 +50,7 @@ struct BufferState { decltype(R::weight) max_weight; size_t sample_size; decltype(R::weight) total_weight; + BufferView *buffer; ~BufferState() { delete alias; @@ -83,7 +82,7 @@ public: return res; } - static void* get_buffer_query_state(MutableBuffer *buffer, void *parms) { + static void* get_buffer_query_state(BufferView *buffer, void *parms) { BufferState *state = new BufferState(); auto parameters = (Parms*) parms; @@ -92,16 +91,17 @@ public: state->max_weight = buffer->get_max_weight(); state->total_weight = buffer->get_total_weight(); state->sample_size = 0; + state->buffer = buffer; return state; } std::vector weights; - state->cutoff = buffer->get_record_count() - 1; + state->buffer = buffer; decltype(R::weight) total_weight = 0; - for (size_t i = 0; i <= state->cutoff; i++) { - auto rec = buffer->get_data() + i; + for (size_t i = 0; i <= buffer->get_record_count(); i++) { + auto rec = buffer->get(i); if (rec->rec.key >= parameters->lower_bound && rec->rec.key <= parameters->upper_bound && !rec->is_tombstone() && !rec->is_deleted()) { weights.push_back(rec->rec.weight); @@ -190,9 +190,10 @@ public: return result_set; } - static std::vector> buffer_query(MutableBuffer *buffer, void *state, void *parms) { + static std::vector> buffer_query(void *state, void *parms) { auto st = (BufferState *) state; auto p = (Parms *) parms; + auto buffer = st->buffer; std::vector> result; result.reserve(st->sample_size); @@ -200,7 +201,7 @@ public: if constexpr (Rejection) { for (size_t i=0; isample_size; i++) { auto idx = gsl_rng_uniform_int(p->rng, st->cutoff); - auto rec = buffer->get_data() + idx; + auto rec = buffer->get(idx); auto test = gsl_rng_uniform(p->rng) * st->max_weight; diff --git a/include/query/wss.h b/include/query/wss.h index 4c8861e..9f192ee 100644 --- a/include/query/wss.h +++ b/include/query/wss.h @@ -11,9 +11,8 @@ */ #pragma once -#include "framework/interface/Record.h" -#include "framework/interface/Shard.h" -#include "framework/structure/MutableBuffer.h" +#include "framework/QueryRequirements.h" +#include "psu-ds/Alias.h" namespace de { namespace wss { @@ -40,6 +39,7 @@ struct BufferState { psudb::Alias *alias; decltype(R::weight) max_weight; decltype(R::weight) total_weight; + BufferView *buffer; ~BufferState() { delete alias; @@ -60,23 +60,24 @@ public: return res; } - static void* get_buffer_query_state(MutableBuffer *buffer, void *parms) { + static void* get_buffer_query_state(BufferState *buffer, void *parms) { BufferState *state = new BufferState(); auto parameters = (Parms*) parms; if constexpr (Rejection) { state->cutoff = buffer->get_record_count() - 1; state->max_weight = buffer->get_max_weight(); state->total_weight = buffer->get_total_weight(); + state->buffer = buffer; return state; } std::vector weights; - state->cutoff = buffer->get_record_count() - 1; double total_weight = 0.0; + state->buffer = buffer; - for (size_t i = 0; i <= state->cutoff; i++) { - auto rec = buffer->get_data() + i; + for (size_t i = 0; i <= buffer->get_record_count(); i++) { + auto rec = buffer->get_data(i); weights.push_back(rec->rec.weight); total_weight += rec->rec.weight; } @@ -152,9 +153,10 @@ public: return result_set; } - static std::vector> buffer_query(MutableBuffer *buffer, void *state, void *parms) { + static std::vector> buffer_query(void *state, void *parms) { auto st = (BufferState *) state; auto p = (Parms *) parms; + auto buffer = st->buffer; std::vector> result; result.reserve(st->sample_size); @@ -162,7 +164,7 @@ public: if constexpr (Rejection) { for (size_t i=0; isample_size; i++) { auto idx = gsl_rng_uniform_int(p->rng, st->cutoff); - auto rec = buffer->get_data() + idx; + auto rec = buffer->get(idx); auto test = gsl_rng_uniform(p->rng) * st->max_weight; -- cgit v1.2.3