summaryrefslogtreecommitdiffstats
path: root/include/query/rangecount.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/query/rangecount.h')
-rw-r--r--include/query/rangecount.h259
1 files changed, 129 insertions, 130 deletions
diff --git a/include/query/rangecount.h b/include/query/rangecount.h
index 5b95cdd..68d304d 100644
--- a/include/query/rangecount.h
+++ b/include/query/rangecount.h
@@ -5,169 +5,168 @@
*
* Distributed under the Modified BSD License.
*
- * A query class for single dimensional range count queries. This query
- * requires that the shard support get_lower_bound(key) and
+ * A query class for single dimensional range count queries. This query
+ * requires that the shard support get_lower_bound(key) and
* get_record_at(index).
*/
#pragma once
#include "framework/QueryRequirements.h"
-namespace de { namespace rc {
+namespace de {
+namespace rc {
-template <RecordInterface R>
-struct Parms {
+template <ShardInterface S, bool FORCE_SCAN = true> class Query {
+ typedef typename S::RECORD R;
+
+public:
+ struct Parameters {
decltype(R::key) lower_bound;
decltype(R::key) upper_bound;
-};
+ };
-template <RecordInterface R>
-struct State {
+ struct LocalQuery {
size_t start_idx;
size_t stop_idx;
-};
+ Parameters global_parms;
+ };
-template <RecordInterface R>
-struct BufferState {
+ struct LocalQueryBuffer {
BufferView<R> *buffer;
-
- BufferState(BufferView<R> *buffer)
- : buffer(buffer) {}
-};
-
-template <KVPInterface R, ShardInterface<R> S, bool FORCE_SCAN=false>
-class Query {
-public:
- constexpr static bool EARLY_ABORT=false;
- constexpr static bool SKIP_DELETE_FILTER=true;
-
- static void *get_query_state(S *shard, void *parms) {
- return nullptr;
- }
-
- static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) {
- auto res = new BufferState<R>(buffer);
-
- return res;
+ Parameters global_parms;
+ };
+
+ struct LocalResultType {
+ size_t record_count;
+ size_t tombstone_count;
+
+ bool is_deleted() {return false;}
+ bool is_tombstone() {return false;}
+ };
+
+ typedef size_t ResultType;
+ constexpr static bool EARLY_ABORT = false;
+ constexpr static bool SKIP_DELETE_FILTER = true;
+
+ static LocalQuery *local_preproc(S *shard, Parameters *parms) {
+ auto query = new LocalQuery();
+
+ query->start_idx = shard->get_lower_bound(parms->lower_bound);
+ query->stop_idx = shard->get_record_count();
+ query->global_parms.lower_bound = parms->lower_bound;
+ query->global_parms.upper_bound = parms->upper_bound;
+
+ return query;
+ }
+
+ static LocalQueryBuffer *local_preproc_buffer(BufferView<R> *buffer,
+ Parameters *parms) {
+ auto query = new LocalQueryBuffer();
+ query->buffer = buffer;
+ query->global_parms.lower_bound = parms->lower_bound;
+ query->global_parms.upper_bound = parms->upper_bound;
+
+ return query;
+ }
+
+ static void distribute_query(Parameters *parms,
+ std::vector<LocalQuery *> const &local_queries,
+ LocalQueryBuffer *buffer_query) {
+ return;
+ }
+
+ static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
+ std::vector<LocalResultType> result;
+
+ /*
+ * if the returned index is one past the end of the
+ * records for the PGM, then there are not records
+ * in the index falling into the specified range.
+ */
+ if (query->start_idx == shard->get_record_count()) {
+ return result;
}
- static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void* buffer_state) {
- return;
+ auto ptr = shard->get_record_at(query->start_idx);
+ size_t reccnt = 0;
+ size_t tscnt = 0;
+
+ /*
+ * roll the pointer forward to the first record that is
+ * greater than or equal to the lower bound.
+ */
+ while (ptr < shard->get_data() + query->stop_idx &&
+ ptr->rec.key < query->global_parms.lower_bound) {
+ ptr++;
}
- static std::vector<Wrapped<R>> query(S *shard, void *q_state, void *parms) {
- std::vector<Wrapped<R>> records;
- auto p = (Parms<R> *) parms;
- auto s = (State<R> *) q_state;
-
- size_t reccnt = 0;
- size_t tscnt = 0;
-
- Wrapped<R> res;
- res.rec.key= 0; // records
- res.rec.value = 0; // tombstones
- records.emplace_back(res);
-
-
- auto start_idx = shard->get_lower_bound(p->lower_bound);
- auto stop_idx = shard->get_lower_bound(p->upper_bound);
+ while (ptr < shard->get_data() + query->stop_idx &&
+ ptr->rec.key <= query->global_parms.upper_bound) {
- /*
- * if the returned index is one past the end of the
- * records for the PGM, then there are not records
- * in the index falling into the specified range.
- */
- if (start_idx == shard->get_record_count()) {
- return records;
- }
-
-
- /*
- * roll the pointer forward to the first record that is
- * greater than or equal to the lower bound.
- */
- auto recs = shard->get_data();
- while(start_idx < stop_idx && recs[start_idx].rec.key < p->lower_bound) {
- start_idx++;
- }
-
- while (stop_idx < shard->get_record_count() && recs[stop_idx].rec.key <= p->upper_bound) {
- stop_idx++;
- }
- size_t idx = start_idx;
- size_t ts_cnt = 0;
+ if (!ptr->is_deleted()) {
+ reccnt++;
- while (idx < stop_idx) {
- ts_cnt += recs[idx].is_tombstone() * 2 + recs[idx].is_deleted();
- idx++;
+ if (ptr->is_tombstone()) {
+ tscnt++;
}
+ }
- records[0].rec.key = idx - start_idx;
- records[0].rec.value = ts_cnt;
-
- return records;
+ ptr++;
}
- static std::vector<Wrapped<R>> buffer_query(void *state, void *parms) {
- auto p = (Parms<R> *) parms;
- auto s = (BufferState<R> *) state;
-
- std::vector<Wrapped<R>> records;
-
- Wrapped<R> res;
- res.rec.key= 0; // records
- res.rec.value = 0; // tombstones
- records.emplace_back(res);
-
- size_t stop_idx;
- if constexpr (FORCE_SCAN) {
- stop_idx = s->buffer->get_capacity() / 2;
- } else {
- stop_idx = s->buffer->get_record_count();
- }
-
- for (size_t i=0; i<s->buffer->get_record_count(); i++) {
- auto rec = s->buffer->get(i);
-
- if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound
- && !rec->is_deleted()) {
- if (rec->is_tombstone()) {
- records[0].rec.value++;
- } else {
- records[0].rec.key++;
- }
- }
+ result.push_back({reccnt, tscnt});
+ return result;
+ }
+
+ static std::vector<LocalResultType>
+ local_query_buffer(LocalQueryBuffer *query) {
+
+ std::vector<LocalResultType> result;
+ size_t reccnt = 0;
+ size_t tscnt = 0;
+ for (size_t i = 0; i < query->buffer->get_record_count(); i++) {
+ auto rec = query->buffer->get(i);
+ if (rec->rec.key >= query->global_parms.lower_bound &&
+ rec->rec.key <= query->global_parms.upper_bound) {
+ if (!rec->is_deleted()) {
+ reccnt++;
+ if (rec->is_tombstone()) {
+ tscnt++;
+ }
}
-
- return records;
+ }
}
- static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) {
- R res;
- res.key = 0;
- res.value = 0;
- output.emplace_back(res);
+ result.push_back({reccnt, tscnt});
- for (size_t i=0; i<results.size(); i++) {
- output[0].key += results[i][0].rec.key; // records
- output[0].value += results[i][0].rec.value; // tombstones
- }
+ return result;
+ }
- output[0].key -= output[0].value;
- return output;
- }
+ static void
+ combine(std::vector<std::vector<LocalResultType>> const &local_results,
+ Parameters *parms, std::vector<ResultType> &output) {
+ size_t reccnt = 0;
+ size_t tscnt = 0;
- static void delete_query_state(void *state) {
+ for (auto &local_result : local_results) {
+ reccnt += local_result[0].record_count;
+ tscnt += local_result[0].tombstone_count;
}
- static void delete_buffer_query_state(void *state) {
- auto s = (BufferState<R> *) state;
- delete s;
+ /* if more tombstones than results, clamp the output at 0 */
+ if (tscnt > reccnt) {
+ tscnt = reccnt;
}
- static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) {
- return false;
- }
+ output.push_back({reccnt - tscnt});
+ }
+
+ static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ std::vector<LocalQuery *> const &local_queries,
+ LocalQueryBuffer *buffer_query) {
+ return false;
+ }
};
-}}
+} // namespace rc
+} // namespace de