diff options
Diffstat (limited to 'include/query/pointlookup.h')
| -rw-r--r-- | include/query/pointlookup.h | 170 |
1 files changed, 83 insertions, 87 deletions
diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h index 94c2bce..f3788de 100644 --- a/include/query/pointlookup.h +++ b/include/query/pointlookup.h @@ -18,106 +18,102 @@ #include "framework/QueryRequirements.h" -namespace de { namespace pl { +namespace de { +namespace pl { -template <RecordInterface R> -struct Parms { - decltype(R::key) search_key; -}; +template <ShardInterface S> class Query { + typedef typename S::RECORD R; -template <RecordInterface R> -struct State { -}; - -template <RecordInterface R> -struct BufferState { - BufferView<R> *buffer; - - BufferState(BufferView<R> *buffer) - : buffer(buffer) {} -}; - -template <KVPInterface R, ShardInterface<R> S> -class Query { public: - constexpr static bool EARLY_ABORT=true; - constexpr static bool SKIP_DELETE_FILTER=true; - - static void *get_query_state(S *shard, void *parms) { - return nullptr; - } - - static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) { - auto res = new BufferState<R>(buffer); + struct Parameters { + decltype(R::key) search_key; + }; - return res; - } + struct LocalQuery { + Parameters global_parms; + }; - static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void* buffer_state) { - return; + struct LocalQueryBuffer { + BufferView<R> *buffer; + Parameters global_parms; + }; + + typedef Wrapped<R> LocalResultType; + typedef R ResultType; + + constexpr static bool EARLY_ABORT = true; + constexpr static bool SKIP_DELETE_FILTER = true; + + static LocalQuery *local_preproc(S *shard, Parameters *parms) { + auto query = new LocalQuery(); + query->global_parms = *parms; + return query; + } + + static LocalQueryBuffer *local_preproc_buffer(BufferView<R> *buffer, + Parameters *parms) { + auto query = new LocalQueryBuffer(); + query->buffer = buffer; + query->global_parms = *parms; + + return query; + } + + static void distribute_query(Parameters *parms, + std::vector<LocalQuery *> const &local_queries, + LocalQueryBuffer *buffer_query) { + return; + } + + static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { + std::vector<LocalResultType> result; + + auto r = shard->point_lookup({query->global_parms.search_key, 0}); + + if (r) { + result.push_back(*r); } - static std::vector<Wrapped<R>> query(S *shard, void *q_state, void *parms) { - auto p = (Parms<R> *) parms; - auto s = (State<R> *) q_state; - - std::vector<Wrapped<R>> result; - - auto r = shard->point_lookup({p->search_key, 0}); + return result; + } + + static std::vector<LocalResultType> + local_query_buffer(LocalQueryBuffer *query) { + std::vector<LocalResultType> result; - if (r) { - result.push_back(*r); - } + for (size_t i = 0; i < query->buffer->get_record_count(); i++) { + auto rec = query->buffer->get(i); + if (rec->rec.key == query->global_parms.search_key) { + result.push_back(*rec); return result; + } } - static std::vector<Wrapped<R>> buffer_query(void *state, void *parms) { - auto p = (Parms<R> *) parms; - auto s = (BufferState<R> *) state; - - std::vector<Wrapped<R>> records; - for (size_t i=0; i<s->buffer->get_record_count(); i++) { - auto rec = s->buffer->get(i); - - if (rec->rec.key == p->search_key) { - records.push_back(*rec); - return records; - } + return result; + } + + + static void + combine(std::vector<std::vector<LocalResultType>> const &local_results, + Parameters *parms, std::vector<ResultType> &output) { + for (auto r : local_results) { + if (r.size() > 0) { + if (r[0].is_deleted() || r[0].is_tombstone()) { + return; } - return records; - } - - static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) { - for (auto r : results) { - if (r.size() > 0) { - if (r[0].is_deleted() || r[0].is_tombstone()) { - return output; - } - - output.push_back(r[0].rec); - return output; - } - } - - return output; - } - - static void delete_query_state(void *state) { - auto s = (State<R> *) state; - delete s; - } - - static void delete_buffer_query_state(void *state) { - auto s = (BufferState<R> *) state; - delete s; - } - - - static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) { - return false; + output.push_back(r[0].rec); + return; + } } + } + + static bool repeat(Parameters *parms, std::vector<ResultType> &output, + std::vector<LocalQuery *> const &local_queries, + LocalQueryBuffer *buffer_query) { + return false; + } }; - -}} +} // namespace pl +} // namespace de |