From e02742b07540dd5a9bcbb44dae14856bf10955ed Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 15:18:53 -0500 Subject: Refactoring progress --- include/query/wss.h | 204 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 204 insertions(+) create mode 100644 include/query/wss.h (limited to 'include/query/wss.h') diff --git a/include/query/wss.h b/include/query/wss.h new file mode 100644 index 0000000..b8a5d54 --- /dev/null +++ b/include/query/wss.h @@ -0,0 +1,204 @@ +/* + * include/query/rangequery.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include "framework/interface/Record.h" +#include "framework/interface/Shard.h" +#include "framework/structure/MutableBuffer.h" + +namespace de { namespace wss { + +template +struct Parms { + size_t sample_size; + gsl_rng *rng; +}; + +template +struct State { + decltype(R::weight) total_weight; + size_t sample_size; + + State() { + total_weight = 0; + } +}; + +template +struct BufferState { + size_t cutoff; + size_t sample_size; + psudb::Alias *alias; + decltype(R::weight) max_weight; + decltype(R::weight) total_weight; + + ~BufferState() { + delete alias; + } +}; + +template +class Query { +public: + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=false; + + static void *get_query_state(S *shard, void *parms) { + auto res = new State(); + res->total_weight = shard->get_total_weight(); + res->sample_size = 0; + + return res; + } + + static void* get_buffer_query_state(MutableBuffer *buffer, void *parms) { + BufferState *state = new BufferState(); + auto parameters = (Parms*) parms; + if constexpr (Rejection) { + state->cutoff = buffer->get_record_count() - 1; + state->max_weight = buffer->get_max_weight(); + state->total_weight = buffer->get_total_weight(); + return state; + } + + std::vector weights; + + state->cutoff = buffer->get_record_count() - 1; + double total_weight = 0.0; + + for (size_t i = 0; i <= state->cutoff; i++) { + auto rec = buffer->get_data() + i; + weights.push_back(rec->rec.weight); + total_weight += rec->rec.weight; + } + + for (size_t i = 0; i < weights.size(); i++) { + weights[i] = weights[i] / total_weight; + } + + state->alias = new psudb::Alias(weights); + state->total_weight = total_weight; + + return state; + } + + static void process_query_states(void *query_parms, std::vector &shard_states, std::vector &buffer_states) { + auto p = (Parms *) query_parms; + auto bs = (BufferState *) buffer_states[0]; + + std::vector shard_sample_sizes(shard_states.size()+1, 0); + size_t buffer_sz = 0; + + std::vector weights; + weights.push_back(bs->total_weight); + + decltype(R::weight) total_weight = 0; + for (auto &s : shard_states) { + auto state = (State *) s; + total_weight += state->total_weight; + weights.push_back(state->total_weight); + } + + std::vector normalized_weights; + for (auto w : weights) { + normalized_weights.push_back((double) w / (double) total_weight); + } + + auto shard_alias = psudb::Alias(normalized_weights); + for (size_t i=0; isample_size; i++) { + auto idx = shard_alias.get(p->rng); + if (idx == 0) { + buffer_sz++; + } else { + shard_sample_sizes[idx - 1]++; + } + } + + + bs->sample_size = buffer_sz; + for (size_t i=0; i *) shard_states[i]; + state->sample_size = shard_sample_sizes[i+1]; + } + } + + static std::vector> query(S *shard, void *q_state, void *parms) { + auto rng = ((Parms *) parms)->rng; + + auto state = (State *) q_state; + auto sample_size = state->sample_size; + + std::vector> result_set; + + if (sample_size == 0) { + return result_set; + } + size_t attempts = 0; + do { + attempts++; + size_t idx = shard->m_alias->get(rng); + result_set.emplace_back(*shard->get_record_at(idx)); + } while (attempts < sample_size); + + return result_set; + } + + static std::vector> buffer_query(MutableBuffer *buffer, void *state, void *parms) { + auto st = (BufferState *) state; + auto p = (Parms *) parms; + + std::vector> result; + result.reserve(st->sample_size); + + if constexpr (Rejection) { + for (size_t i=0; isample_size; i++) { + auto idx = gsl_rng_uniform_int(p->rng, st->cutoff); + auto rec = buffer->get_data() + idx; + + auto test = gsl_rng_uniform(p->rng) * st->max_weight; + + if (test <= rec->rec.weight) { + result.emplace_back(*rec); + } + } + return result; + } + + for (size_t i=0; isample_size; i++) { + auto idx = st->alias->get(p->rng); + result.emplace_back(*(buffer->get_data() + idx)); + } + + return result; + } + + static std::vector merge(std::vector>> &results, void *parms) { + std::vector output; + + for (size_t i=0; i *) state; + delete s; + } + + static void delete_buffer_query_state(void *state) { + auto s = (BufferState *) state; + delete s; + } +}; + +}} -- cgit v1.2.3