diff options
| author | Douglas B. Rumbaugh <dbr4@psu.edu> | 2024-12-06 13:13:51 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-12-06 18:13:51 +0000 |
| commit | 9fe305c7d28e993e55c55427f377ae7e3251ea4f (patch) | |
| tree | 384b687f64b84eb81bde2becac8a5f24916b07b4 /include/query/rangecount.h | |
| parent | 47916da2ba5ed5bee2dda3cbcc58d39e1e931bfc (diff) | |
| download | dynamic-extension-9fe305c7d28e993e55c55427f377ae7e3251ea4f.tar.gz | |
Interface update (#5)
* Query Interface Adjustments/Refactoring
Began the process of adjusting the query interface (and also the shard
interface, to a lesser degree) to better accommodate the user. In
particular the following changes have been made,
1. The number of necessary template arguments for the query type
has been drastically reduced, while also removing the void pointers
and manual delete functions from the interface.
This was accomplished by requiring many of the sub-types associated
with a query (parameters, etc.) to be nested inside the main query
class, and by forcing the SHARD type to expose its associated
record type.
2. User-defined query return types are now supported.
Queries no longer are required to return strictly sets of records.
Instead, the query now has LocalResultType and ResultType
template parameters (which can be defaulted using a typedef in
the Query type itself), allowing much more flexibility.
Note that, at least for the short term, the LocalResultType must
still expose the same is_deleted/is_tombstone interface as a
Wrapped<R> used to, as this is currently needed for delete
filtering. A better approach to this is, hopefully, forthcoming.
3. Updated the ISAMTree.h shard and rangequery.h query to use the
new interfaces, and adjusted the associated unit tests as well.
4. Dropped the unnecessary "get_data()" function from the ShardInterface
concept.
5. Dropped the need to specify a record type in the ShardInterface
concept. This is now handled using a required Shard::RECORD
member of the Shard class itself, which should expose the name
of the record type.
* Updates to framework to support new Query/Shard interfaces
Pretty extensive adjustments to the framework, particularly to the
templates themselves, along with some type-renaming work, to support
the new query and shard interfaces.
Adjusted the external query interface to take an rvalue reference, rather
than a pointer, to the query parameters.
* Removed framework-level delete filtering
This was causing some issues with the new query interface, and should
probably be reworked anyway, so I'm temporarily (TM) removing the
feature.
* Updated benchmarks + remaining code for new interface
Diffstat (limited to 'include/query/rangecount.h')
| -rw-r--r-- | include/query/rangecount.h | 259 |
1 files changed, 129 insertions, 130 deletions
diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 5b95cdd..68d304d 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -5,169 +5,168 @@ * * Distributed under the Modified BSD License. * - * A query class for single dimensional range count queries. This query - * requires that the shard support get_lower_bound(key) and + * A query class for single dimensional range count queries. This query + * requires that the shard support get_lower_bound(key) and * get_record_at(index). */ #pragma once #include "framework/QueryRequirements.h" -namespace de { namespace rc { +namespace de { +namespace rc { -template <RecordInterface R> -struct Parms { +template <ShardInterface S, bool FORCE_SCAN = true> class Query { + typedef typename S::RECORD R; + +public: + struct Parameters { decltype(R::key) lower_bound; decltype(R::key) upper_bound; -}; + }; -template <RecordInterface R> -struct State { + struct LocalQuery { size_t start_idx; size_t stop_idx; -}; + Parameters global_parms; + }; -template <RecordInterface R> -struct BufferState { + struct LocalQueryBuffer { BufferView<R> *buffer; - - BufferState(BufferView<R> *buffer) - : buffer(buffer) {} -}; - -template <KVPInterface R, ShardInterface<R> S, bool FORCE_SCAN=false> -class Query { -public: - constexpr static bool EARLY_ABORT=false; - constexpr static bool SKIP_DELETE_FILTER=true; - - static void *get_query_state(S *shard, void *parms) { - return nullptr; - } - - static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) { - auto res = new BufferState<R>(buffer); - - return res; + Parameters global_parms; + }; + + struct LocalResultType { + size_t record_count; + size_t tombstone_count; + + bool is_deleted() {return false;} + bool is_tombstone() {return false;} + }; + + typedef size_t ResultType; + constexpr static bool EARLY_ABORT = false; + constexpr static bool SKIP_DELETE_FILTER = true; + + static LocalQuery *local_preproc(S *shard, Parameters *parms) { + auto query = new LocalQuery(); + + query->start_idx = shard->get_lower_bound(parms->lower_bound); + query->stop_idx = shard->get_record_count(); + query->global_parms.lower_bound = parms->lower_bound; + query->global_parms.upper_bound = parms->upper_bound; + + return query; + } + + static LocalQueryBuffer *local_preproc_buffer(BufferView<R> *buffer, + Parameters *parms) { + auto query = new LocalQueryBuffer(); + query->buffer = buffer; + query->global_parms.lower_bound = parms->lower_bound; + query->global_parms.upper_bound = parms->upper_bound; + + return query; + } + + static void distribute_query(Parameters *parms, + std::vector<LocalQuery *> const &local_queries, + LocalQueryBuffer *buffer_query) { + return; + } + + static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { + std::vector<LocalResultType> result; + + /* + * if the returned index is one past the end of the + * records for the PGM, then there are not records + * in the index falling into the specified range. + */ + if (query->start_idx == shard->get_record_count()) { + return result; } - static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void* buffer_state) { - return; + auto ptr = shard->get_record_at(query->start_idx); + size_t reccnt = 0; + size_t tscnt = 0; + + /* + * roll the pointer forward to the first record that is + * greater than or equal to the lower bound. + */ + while (ptr < shard->get_data() + query->stop_idx && + ptr->rec.key < query->global_parms.lower_bound) { + ptr++; } - static std::vector<Wrapped<R>> query(S *shard, void *q_state, void *parms) { - std::vector<Wrapped<R>> records; - auto p = (Parms<R> *) parms; - auto s = (State<R> *) q_state; - - size_t reccnt = 0; - size_t tscnt = 0; - - Wrapped<R> res; - res.rec.key= 0; // records - res.rec.value = 0; // tombstones - records.emplace_back(res); - - - auto start_idx = shard->get_lower_bound(p->lower_bound); - auto stop_idx = shard->get_lower_bound(p->upper_bound); + while (ptr < shard->get_data() + query->stop_idx && + ptr->rec.key <= query->global_parms.upper_bound) { - /* - * if the returned index is one past the end of the - * records for the PGM, then there are not records - * in the index falling into the specified range. - */ - if (start_idx == shard->get_record_count()) { - return records; - } - - - /* - * roll the pointer forward to the first record that is - * greater than or equal to the lower bound. - */ - auto recs = shard->get_data(); - while(start_idx < stop_idx && recs[start_idx].rec.key < p->lower_bound) { - start_idx++; - } - - while (stop_idx < shard->get_record_count() && recs[stop_idx].rec.key <= p->upper_bound) { - stop_idx++; - } - size_t idx = start_idx; - size_t ts_cnt = 0; + if (!ptr->is_deleted()) { + reccnt++; - while (idx < stop_idx) { - ts_cnt += recs[idx].is_tombstone() * 2 + recs[idx].is_deleted(); - idx++; + if (ptr->is_tombstone()) { + tscnt++; } + } - records[0].rec.key = idx - start_idx; - records[0].rec.value = ts_cnt; - - return records; + ptr++; } - static std::vector<Wrapped<R>> buffer_query(void *state, void *parms) { - auto p = (Parms<R> *) parms; - auto s = (BufferState<R> *) state; - - std::vector<Wrapped<R>> records; - - Wrapped<R> res; - res.rec.key= 0; // records - res.rec.value = 0; // tombstones - records.emplace_back(res); - - size_t stop_idx; - if constexpr (FORCE_SCAN) { - stop_idx = s->buffer->get_capacity() / 2; - } else { - stop_idx = s->buffer->get_record_count(); - } - - for (size_t i=0; i<s->buffer->get_record_count(); i++) { - auto rec = s->buffer->get(i); - - if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound - && !rec->is_deleted()) { - if (rec->is_tombstone()) { - records[0].rec.value++; - } else { - records[0].rec.key++; - } - } + result.push_back({reccnt, tscnt}); + return result; + } + + static std::vector<LocalResultType> + local_query_buffer(LocalQueryBuffer *query) { + + std::vector<LocalResultType> result; + size_t reccnt = 0; + size_t tscnt = 0; + for (size_t i = 0; i < query->buffer->get_record_count(); i++) { + auto rec = query->buffer->get(i); + if (rec->rec.key >= query->global_parms.lower_bound && + rec->rec.key <= query->global_parms.upper_bound) { + if (!rec->is_deleted()) { + reccnt++; + if (rec->is_tombstone()) { + tscnt++; + } } - - return records; + } } - static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) { - R res; - res.key = 0; - res.value = 0; - output.emplace_back(res); + result.push_back({reccnt, tscnt}); - for (size_t i=0; i<results.size(); i++) { - output[0].key += results[i][0].rec.key; // records - output[0].value += results[i][0].rec.value; // tombstones - } + return result; + } - output[0].key -= output[0].value; - return output; - } + static void + combine(std::vector<std::vector<LocalResultType>> const &local_results, + Parameters *parms, std::vector<ResultType> &output) { + size_t reccnt = 0; + size_t tscnt = 0; - static void delete_query_state(void *state) { + for (auto &local_result : local_results) { + reccnt += local_result[0].record_count; + tscnt += local_result[0].tombstone_count; } - static void delete_buffer_query_state(void *state) { - auto s = (BufferState<R> *) state; - delete s; + /* if more tombstones than results, clamp the output at 0 */ + if (tscnt > reccnt) { + tscnt = reccnt; } - static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) { - return false; - } + output.push_back({reccnt - tscnt}); + } + + static bool repeat(Parameters *parms, std::vector<ResultType> &output, + std::vector<LocalQuery *> const &local_queries, + LocalQueryBuffer *buffer_query) { + return false; + } }; -}} +} // namespace rc +} // namespace de |