diff options
Diffstat (limited to 'include/query/rangecount.h')
| -rw-r--r-- | include/query/rangecount.h | 60 |
1 files changed, 34 insertions, 26 deletions
diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 6c57809..5b95cdd 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -35,20 +35,14 @@ struct BufferState { : buffer(buffer) {} }; -template <KVPInterface R, ShardInterface<R> S> +template <KVPInterface R, ShardInterface<R> S, bool FORCE_SCAN=false> class Query { public: constexpr static bool EARLY_ABORT=false; constexpr static bool SKIP_DELETE_FILTER=true; static void *get_query_state(S *shard, void *parms) { - auto res = new State<R>(); - auto p = (Parms<R> *) parms; - - res->start_idx = shard->get_lower_bound(p->lower_bound); - res->stop_idx = shard->get_record_count(); - - return res; + return nullptr; } static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) { @@ -74,37 +68,43 @@ public: res.rec.value = 0; // tombstones records.emplace_back(res); + + auto start_idx = shard->get_lower_bound(p->lower_bound); + auto stop_idx = shard->get_lower_bound(p->upper_bound); + /* * if the returned index is one past the end of the * records for the PGM, then there are not records * in the index falling into the specified range. */ - if (s->start_idx == shard->get_record_count()) { + if (start_idx == shard->get_record_count()) { return records; } - auto ptr = shard->get_record_at(s->start_idx); /* * roll the pointer forward to the first record that is * greater than or equal to the lower bound. */ - while(ptr < shard->get_data() + s->stop_idx && ptr->rec.key < p->lower_bound) { - ptr++; + auto recs = shard->get_data(); + while(start_idx < stop_idx && recs[start_idx].rec.key < p->lower_bound) { + start_idx++; } - while (ptr < shard->get_data() + s->stop_idx && ptr->rec.key <= p->upper_bound) { - if (!ptr->is_deleted()) { - if (ptr->is_tombstone()) { - records[0].rec.value++; - } else { - records[0].rec.key++; - } - } + while (stop_idx < shard->get_record_count() && recs[stop_idx].rec.key <= p->upper_bound) { + stop_idx++; + } + size_t idx = start_idx; + size_t ts_cnt = 0; - ptr++; + while (idx < stop_idx) { + ts_cnt += recs[idx].is_tombstone() * 2 + recs[idx].is_deleted(); + idx++; } + records[0].rec.key = idx - start_idx; + records[0].rec.value = ts_cnt; + return records; } @@ -119,8 +119,16 @@ public: res.rec.value = 0; // tombstones records.emplace_back(res); + size_t stop_idx; + if constexpr (FORCE_SCAN) { + stop_idx = s->buffer->get_capacity() / 2; + } else { + stop_idx = s->buffer->get_record_count(); + } + for (size_t i=0; i<s->buffer->get_record_count(); i++) { auto rec = s->buffer->get(i); + if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound && !rec->is_deleted()) { if (rec->is_tombstone()) { @@ -134,12 +142,10 @@ public: return records; } - static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) { - + static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) { R res; res.key = 0; res.value = 0; - std::vector<R> output; output.emplace_back(res); for (size_t i=0; i<results.size(); i++) { @@ -152,14 +158,16 @@ public: } static void delete_query_state(void *state) { - auto s = (State<R> *) state; - delete s; } static void delete_buffer_query_state(void *state) { auto s = (BufferState<R> *) state; delete s; } + + static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) { + return false; + } }; }} |