diff options
Diffstat (limited to 'include/query/rangecount.h')
| -rw-r--r-- | include/query/rangecount.h | 259 |
1 files changed, 129 insertions, 130 deletions
diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 5b95cdd..68d304d 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -5,169 +5,168 @@ * * Distributed under the Modified BSD License. * - * A query class for single dimensional range count queries. This query - * requires that the shard support get_lower_bound(key) and + * A query class for single dimensional range count queries. This query + * requires that the shard support get_lower_bound(key) and * get_record_at(index). */ #pragma once #include "framework/QueryRequirements.h" -namespace de { namespace rc { +namespace de { +namespace rc { -template <RecordInterface R> -struct Parms { +template <ShardInterface S, bool FORCE_SCAN = true> class Query { + typedef typename S::RECORD R; + +public: + struct Parameters { decltype(R::key) lower_bound; decltype(R::key) upper_bound; -}; + }; -template <RecordInterface R> -struct State { + struct LocalQuery { size_t start_idx; size_t stop_idx; -}; + Parameters global_parms; + }; -template <RecordInterface R> -struct BufferState { + struct LocalQueryBuffer { BufferView<R> *buffer; - - BufferState(BufferView<R> *buffer) - : buffer(buffer) {} -}; - -template <KVPInterface R, ShardInterface<R> S, bool FORCE_SCAN=false> -class Query { -public: - constexpr static bool EARLY_ABORT=false; - constexpr static bool SKIP_DELETE_FILTER=true; - - static void *get_query_state(S *shard, void *parms) { - return nullptr; - } - - static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) { - auto res = new BufferState<R>(buffer); - - return res; + Parameters global_parms; + }; + + struct LocalResultType { + size_t record_count; + size_t tombstone_count; + + bool is_deleted() {return false;} + bool is_tombstone() {return false;} + }; + + typedef size_t ResultType; + constexpr static bool EARLY_ABORT = false; + constexpr static bool SKIP_DELETE_FILTER = true; + + static LocalQuery *local_preproc(S *shard, Parameters *parms) { + auto query = new LocalQuery(); + + query->start_idx = shard->get_lower_bound(parms->lower_bound); + query->stop_idx = shard->get_record_count(); + query->global_parms.lower_bound = parms->lower_bound; + query->global_parms.upper_bound = parms->upper_bound; + + return query; + } + + static LocalQueryBuffer *local_preproc_buffer(BufferView<R> *buffer, + Parameters *parms) { + auto query = new LocalQueryBuffer(); + query->buffer = buffer; + query->global_parms.lower_bound = parms->lower_bound; + query->global_parms.upper_bound = parms->upper_bound; + + return query; + } + + static void distribute_query(Parameters *parms, + std::vector<LocalQuery *> const &local_queries, + LocalQueryBuffer *buffer_query) { + return; + } + + static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { + std::vector<LocalResultType> result; + + /* + * if the returned index is one past the end of the + * records for the PGM, then there are not records + * in the index falling into the specified range. + */ + if (query->start_idx == shard->get_record_count()) { + return result; } - static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void* buffer_state) { - return; + auto ptr = shard->get_record_at(query->start_idx); + size_t reccnt = 0; + size_t tscnt = 0; + + /* + * roll the pointer forward to the first record that is + * greater than or equal to the lower bound. + */ + while (ptr < shard->get_data() + query->stop_idx && + ptr->rec.key < query->global_parms.lower_bound) { + ptr++; } - static std::vector<Wrapped<R>> query(S *shard, void *q_state, void *parms) { - std::vector<Wrapped<R>> records; - auto p = (Parms<R> *) parms; - auto s = (State<R> *) q_state; - - size_t reccnt = 0; - size_t tscnt = 0; - - Wrapped<R> res; - res.rec.key= 0; // records - res.rec.value = 0; // tombstones - records.emplace_back(res); - - - auto start_idx = shard->get_lower_bound(p->lower_bound); - auto stop_idx = shard->get_lower_bound(p->upper_bound); + while (ptr < shard->get_data() + query->stop_idx && + ptr->rec.key <= query->global_parms.upper_bound) { - /* - * if the returned index is one past the end of the - * records for the PGM, then there are not records - * in the index falling into the specified range. - */ - if (start_idx == shard->get_record_count()) { - return records; - } - - - /* - * roll the pointer forward to the first record that is - * greater than or equal to the lower bound. - */ - auto recs = shard->get_data(); - while(start_idx < stop_idx && recs[start_idx].rec.key < p->lower_bound) { - start_idx++; - } - - while (stop_idx < shard->get_record_count() && recs[stop_idx].rec.key <= p->upper_bound) { - stop_idx++; - } - size_t idx = start_idx; - size_t ts_cnt = 0; + if (!ptr->is_deleted()) { + reccnt++; - while (idx < stop_idx) { - ts_cnt += recs[idx].is_tombstone() * 2 + recs[idx].is_deleted(); - idx++; + if (ptr->is_tombstone()) { + tscnt++; } + } - records[0].rec.key = idx - start_idx; - records[0].rec.value = ts_cnt; - - return records; + ptr++; } - static std::vector<Wrapped<R>> buffer_query(void *state, void *parms) { - auto p = (Parms<R> *) parms; - auto s = (BufferState<R> *) state; - - std::vector<Wrapped<R>> records; - - Wrapped<R> res; - res.rec.key= 0; // records - res.rec.value = 0; // tombstones - records.emplace_back(res); - - size_t stop_idx; - if constexpr (FORCE_SCAN) { - stop_idx = s->buffer->get_capacity() / 2; - } else { - stop_idx = s->buffer->get_record_count(); - } - - for (size_t i=0; i<s->buffer->get_record_count(); i++) { - auto rec = s->buffer->get(i); - - if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound - && !rec->is_deleted()) { - if (rec->is_tombstone()) { - records[0].rec.value++; - } else { - records[0].rec.key++; - } - } + result.push_back({reccnt, tscnt}); + return result; + } + + static std::vector<LocalResultType> + local_query_buffer(LocalQueryBuffer *query) { + + std::vector<LocalResultType> result; + size_t reccnt = 0; + size_t tscnt = 0; + for (size_t i = 0; i < query->buffer->get_record_count(); i++) { + auto rec = query->buffer->get(i); + if (rec->rec.key >= query->global_parms.lower_bound && + rec->rec.key <= query->global_parms.upper_bound) { + if (!rec->is_deleted()) { + reccnt++; + if (rec->is_tombstone()) { + tscnt++; + } } - - return records; + } } - static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) { - R res; - res.key = 0; - res.value = 0; - output.emplace_back(res); + result.push_back({reccnt, tscnt}); - for (size_t i=0; i<results.size(); i++) { - output[0].key += results[i][0].rec.key; // records - output[0].value += results[i][0].rec.value; // tombstones - } + return result; + } - output[0].key -= output[0].value; - return output; - } + static void + combine(std::vector<std::vector<LocalResultType>> const &local_results, + Parameters *parms, std::vector<ResultType> &output) { + size_t reccnt = 0; + size_t tscnt = 0; - static void delete_query_state(void *state) { + for (auto &local_result : local_results) { + reccnt += local_result[0].record_count; + tscnt += local_result[0].tombstone_count; } - static void delete_buffer_query_state(void *state) { - auto s = (BufferState<R> *) state; - delete s; + /* if more tombstones than results, clamp the output at 0 */ + if (tscnt > reccnt) { + tscnt = reccnt; } - static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) { - return false; - } + output.push_back({reccnt - tscnt}); + } + + static bool repeat(Parameters *parms, std::vector<ResultType> &output, + std::vector<LocalQuery *> const &local_queries, + LocalQueryBuffer *buffer_query) { + return false; + } }; -}} +} // namespace rc +} // namespace de |