From 9e1c1b1b930031896851b1ed4a15152508327d73 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 7 Nov 2023 13:35:54 -0500 Subject: Converted WIRS to the new interface --- include/query/wirs.h | 240 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 240 insertions(+) create mode 100644 include/query/wirs.h (limited to 'include/query/wirs.h') diff --git a/include/query/wirs.h b/include/query/wirs.h new file mode 100644 index 0000000..1113b1d --- /dev/null +++ b/include/query/wirs.h @@ -0,0 +1,240 @@ +/* + * include/query/wirs.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include "framework/interface/Record.h" +#include "framework/interface/Shard.h" +#include "framework/structure/MutableBuffer.h" +#include "psu-ds/Alias.h" + +namespace de { namespace wirs { + +template +struct Parms { + decltype(R::key) lower_bound; + decltype(R::key) upper_bound; + size_t sample_size; + gsl_rng *rng; +}; + +template +struct State { + decltype(R::weight) total_weight; + std::vector nodes; + psudb::Alias* top_level_alias; + size_t sample_size; + + State() { + total_weight = 0; + top_level_alias = nullptr; + } + + ~State() { + if (top_level_alias) delete top_level_alias; + } +}; + +template +struct BufferState { + size_t cutoff; + psudb::Alias* alias; + std::vector> records; + decltype(R::weight) max_weight; + size_t sample_size; + decltype(R::weight) total_weight; + + ~BufferState() { + delete alias; + } +}; + +template +class Query { +public: + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=false; + + static void *get_query_state(S *shard, void *parms) { + auto res = new State(); + decltype(R::key) lower_key = ((Parms *) parms)->lower_bound; + decltype(R::key) upper_key = ((Parms *) parms)->upper_bound; + + std::vector weights; + res->total_weight = shard->find_covering_nodes(lower_key, upper_key, res->nodes, weights); + + std::vector normalized_weights; + for (auto weight : weights) { + normalized_weights.emplace_back(weight / res->total_weight); + } + + res->top_level_alias = new psudb::Alias(normalized_weights); + res->sample_size = 0; + + return res; + } + + static void* get_buffer_query_state(MutableBuffer *buffer, void *parms) { + BufferState *state = new BufferState(); + auto parameters = (Parms*) parms; + + if constexpr (Rejection) { + state->cutoff = buffer->get_record_count() - 1; + state->max_weight = buffer->get_max_weight(); + state->total_weight = buffer->get_total_weight(); + state->sample_size = 0; + return state; + } + + std::vector weights; + + state->cutoff = buffer->get_record_count() - 1; + decltype(R::weight) total_weight = 0; + + for (size_t i = 0; i <= state->cutoff; i++) { + auto rec = buffer->get_data() + i; + + if (rec->rec.key >= parameters->lower_bound && rec->rec.key <= parameters->upper_bound && !rec->is_tombstone() && !rec->is_deleted()) { + weights.push_back(rec->rec.weight); + state->records.push_back(*rec); + total_weight += rec->rec.weight; + } + } + + std::vector normalized_weights; + for (size_t i = 0; i < weights.size(); i++) { + normalized_weights.push_back(weights[i] / total_weight); + } + + state->total_weight = total_weight; + state->alias = new psudb::Alias(normalized_weights); + state->sample_size = 0; + + return state; + } + + static void process_query_states(void *query_parms, std::vector &shard_states, std::vector &buffer_states) { + auto p = (Parms *) query_parms; + + std::vector shard_sample_sizes(shard_states.size()+buffer_states.size(), 0); + size_t buffer_sz = 0; + + std::vector weights; + + decltype(R::weight) total_weight = 0; + for (auto &s : buffer_states) { + auto bs = (BufferState *) s; + total_weight += bs->total_weight; + weights.push_back(bs->total_weight); + } + + for (auto &s : shard_states) { + auto state = (State *) s; + total_weight += state->total_weight; + weights.push_back(state->total_weight); + } + + std::vector normalized_weights; + for (auto w : weights) { + normalized_weights.push_back((double) w / (double) total_weight); + } + + auto shard_alias = psudb::Alias(normalized_weights); + for (size_t i=0; isample_size; i++) { + auto idx = shard_alias.get(p->rng); + + if (idx < buffer_states.size()) { + auto state = (BufferState *) buffer_states[idx]; + state->sample_size++; + } else { + auto state = (State *) shard_states[idx - buffer_states.size()]; + state->sample_size++; + } + } + } + + static std::vector> query(S *shard, void *q_state, void *parms) { + auto lower_key = ((Parms *) parms)->lower_bound; + auto upper_key = ((Parms *) parms)->upper_bound; + auto rng = ((Parms *) parms)->rng; + + auto state = (State *) q_state; + auto sample_size = state->sample_size; + + std::vector> result_set; + + if (sample_size == 0) { + return result_set; + } + size_t cnt = 0; + size_t attempts = 0; + + for (size_t i=0; iget_weighted_sample(lower_key, upper_key, + state->nodes[state->top_level_alias->get(rng)], + rng); + if (rec) { + result_set.emplace_back(*rec); + } + } + + return result_set; + } + + static std::vector> buffer_query(MutableBuffer *buffer, void *state, void *parms) { + auto st = (BufferState *) state; + auto p = (Parms *) parms; + + std::vector> result; + result.reserve(st->sample_size); + + if constexpr (Rejection) { + for (size_t i=0; isample_size; i++) { + auto idx = gsl_rng_uniform_int(p->rng, st->cutoff); + auto rec = buffer->get_data() + idx; + + auto test = gsl_rng_uniform(p->rng) * st->max_weight; + + if (test <= rec->rec.weight && rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { + result.emplace_back(*rec); + } + } + return result; + } + + for (size_t i=0; isample_size; i++) { + auto idx = st->alias->get(p->rng); + result.emplace_back(st->records[idx]); + } + + return result; + } + + static std::vector merge(std::vector>> &results, void *parms) { + std::vector output; + + for (size_t i=0; i *) state; + delete s; + } + + static void delete_buffer_query_state(void *state) { + auto s = (BufferState *) state; + delete s; + } +}; +}} -- cgit v1.2.3 From 357cab549c2ed33970562b84ff6f83923742343d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 7 Nov 2023 15:34:24 -0500 Subject: Comment and License updates --- include/query/wirs.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'include/query/wirs.h') diff --git a/include/query/wirs.h b/include/query/wirs.h index 1113b1d..9b3d2ad 100644 --- a/include/query/wirs.h +++ b/include/query/wirs.h @@ -3,7 +3,11 @@ * * Copyright (C) 2023 Douglas B. Rumbaugh * - * All rights reserved. Published under the Modified BSD License. + * Distributed under the Modified BSD License. + * + * A query class for weighted independent range sampling. This + * class is tightly coupled with include/shard/AugBTree.h, and + * so is probably of limited general utility. * */ #pragma once -- cgit v1.2.3 From 10b4425e842d10b7fbfa85978969ed4591d6b98e Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 7 Feb 2024 10:56:52 -0500 Subject: Fully implemented Query concept and adjusted queries to use it --- include/query/wirs.h | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) (limited to 'include/query/wirs.h') diff --git a/include/query/wirs.h b/include/query/wirs.h index 9b3d2ad..07c5292 100644 --- a/include/query/wirs.h +++ b/include/query/wirs.h @@ -12,9 +12,7 @@ */ #pragma once -#include "framework/interface/Record.h" -#include "framework/interface/Shard.h" -#include "framework/structure/MutableBuffer.h" +#include "framework/QueryRequirements.h" #include "psu-ds/Alias.h" namespace de { namespace wirs { @@ -52,6 +50,7 @@ struct BufferState { decltype(R::weight) max_weight; size_t sample_size; decltype(R::weight) total_weight; + BufferView *buffer; ~BufferState() { delete alias; @@ -83,7 +82,7 @@ public: return res; } - static void* get_buffer_query_state(MutableBuffer *buffer, void *parms) { + static void* get_buffer_query_state(BufferView *buffer, void *parms) { BufferState *state = new BufferState(); auto parameters = (Parms*) parms; @@ -92,16 +91,17 @@ public: state->max_weight = buffer->get_max_weight(); state->total_weight = buffer->get_total_weight(); state->sample_size = 0; + state->buffer = buffer; return state; } std::vector weights; - state->cutoff = buffer->get_record_count() - 1; + state->buffer = buffer; decltype(R::weight) total_weight = 0; - for (size_t i = 0; i <= state->cutoff; i++) { - auto rec = buffer->get_data() + i; + for (size_t i = 0; i <= buffer->get_record_count(); i++) { + auto rec = buffer->get(i); if (rec->rec.key >= parameters->lower_bound && rec->rec.key <= parameters->upper_bound && !rec->is_tombstone() && !rec->is_deleted()) { weights.push_back(rec->rec.weight); @@ -190,9 +190,10 @@ public: return result_set; } - static std::vector> buffer_query(MutableBuffer *buffer, void *state, void *parms) { + static std::vector> buffer_query(void *state, void *parms) { auto st = (BufferState *) state; auto p = (Parms *) parms; + auto buffer = st->buffer; std::vector> result; result.reserve(st->sample_size); @@ -200,7 +201,7 @@ public: if constexpr (Rejection) { for (size_t i=0; isample_size; i++) { auto idx = gsl_rng_uniform_int(p->rng, st->cutoff); - auto rec = buffer->get_data() + idx; + auto rec = buffer->get(idx); auto test = gsl_rng_uniform(p->rng) * st->max_weight; -- cgit v1.2.3 From 2c5d549b3618b9ea72e6eece4cb4f3da5a6811a8 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 7 Feb 2024 13:42:34 -0500 Subject: Fully realized shard concept interface --- include/query/wirs.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include/query/wirs.h') diff --git a/include/query/wirs.h b/include/query/wirs.h index 07c5292..4fac7e7 100644 --- a/include/query/wirs.h +++ b/include/query/wirs.h @@ -57,7 +57,7 @@ struct BufferState { } }; -template +template S, bool Rejection=true> class Query { public: constexpr static bool EARLY_ABORT=false; -- cgit v1.2.3 From 402fc269c0aaa671d84a6d15918735ad4b90e6b2 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 9 Feb 2024 12:30:21 -0500 Subject: Comment updates/fixes --- include/query/wirs.h | 1 - 1 file changed, 1 deletion(-) (limited to 'include/query/wirs.h') diff --git a/include/query/wirs.h b/include/query/wirs.h index 4fac7e7..ae82194 100644 --- a/include/query/wirs.h +++ b/include/query/wirs.h @@ -8,7 +8,6 @@ * A query class for weighted independent range sampling. This * class is tightly coupled with include/shard/AugBTree.h, and * so is probably of limited general utility. - * */ #pragma once -- cgit v1.2.3