summaryrefslogtreecommitdiffstats
path: root/include/query/pointlookup.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/query/pointlookup.h')
-rw-r--r--include/query/pointlookup.h170
1 files changed, 83 insertions, 87 deletions
diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h
index 94c2bce..f3788de 100644
--- a/include/query/pointlookup.h
+++ b/include/query/pointlookup.h
@@ -18,106 +18,102 @@
#include "framework/QueryRequirements.h"
-namespace de { namespace pl {
+namespace de {
+namespace pl {
-template <RecordInterface R>
-struct Parms {
- decltype(R::key) search_key;
-};
+template <ShardInterface S> class Query {
+ typedef typename S::RECORD R;
-template <RecordInterface R>
-struct State {
-};
-
-template <RecordInterface R>
-struct BufferState {
- BufferView<R> *buffer;
-
- BufferState(BufferView<R> *buffer)
- : buffer(buffer) {}
-};
-
-template <KVPInterface R, ShardInterface<R> S>
-class Query {
public:
- constexpr static bool EARLY_ABORT=true;
- constexpr static bool SKIP_DELETE_FILTER=true;
-
- static void *get_query_state(S *shard, void *parms) {
- return nullptr;
- }
-
- static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) {
- auto res = new BufferState<R>(buffer);
+ struct Parameters {
+ decltype(R::key) search_key;
+ };
- return res;
- }
+ struct LocalQuery {
+ Parameters global_parms;
+ };
- static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void* buffer_state) {
- return;
+ struct LocalQueryBuffer {
+ BufferView<R> *buffer;
+ Parameters global_parms;
+ };
+
+ typedef Wrapped<R> LocalResultType;
+ typedef R ResultType;
+
+ constexpr static bool EARLY_ABORT = true;
+ constexpr static bool SKIP_DELETE_FILTER = true;
+
+ static LocalQuery *local_preproc(S *shard, Parameters *parms) {
+ auto query = new LocalQuery();
+ query->global_parms = *parms;
+ return query;
+ }
+
+ static LocalQueryBuffer *local_preproc_buffer(BufferView<R> *buffer,
+ Parameters *parms) {
+ auto query = new LocalQueryBuffer();
+ query->buffer = buffer;
+ query->global_parms = *parms;
+
+ return query;
+ }
+
+ static void distribute_query(Parameters *parms,
+ std::vector<LocalQuery *> const &local_queries,
+ LocalQueryBuffer *buffer_query) {
+ return;
+ }
+
+ static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
+ std::vector<LocalResultType> result;
+
+ auto r = shard->point_lookup({query->global_parms.search_key, 0});
+
+ if (r) {
+ result.push_back(*r);
}
- static std::vector<Wrapped<R>> query(S *shard, void *q_state, void *parms) {
- auto p = (Parms<R> *) parms;
- auto s = (State<R> *) q_state;
-
- std::vector<Wrapped<R>> result;
-
- auto r = shard->point_lookup({p->search_key, 0});
+ return result;
+ }
+
+ static std::vector<LocalResultType>
+ local_query_buffer(LocalQueryBuffer *query) {
+ std::vector<LocalResultType> result;
- if (r) {
- result.push_back(*r);
- }
+ for (size_t i = 0; i < query->buffer->get_record_count(); i++) {
+ auto rec = query->buffer->get(i);
+ if (rec->rec.key == query->global_parms.search_key) {
+ result.push_back(*rec);
return result;
+ }
}
- static std::vector<Wrapped<R>> buffer_query(void *state, void *parms) {
- auto p = (Parms<R> *) parms;
- auto s = (BufferState<R> *) state;
-
- std::vector<Wrapped<R>> records;
- for (size_t i=0; i<s->buffer->get_record_count(); i++) {
- auto rec = s->buffer->get(i);
-
- if (rec->rec.key == p->search_key) {
- records.push_back(*rec);
- return records;
- }
+ return result;
+ }
+
+
+ static void
+ combine(std::vector<std::vector<LocalResultType>> const &local_results,
+ Parameters *parms, std::vector<ResultType> &output) {
+ for (auto r : local_results) {
+ if (r.size() > 0) {
+ if (r[0].is_deleted() || r[0].is_tombstone()) {
+ return;
}
- return records;
- }
-
- static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) {
- for (auto r : results) {
- if (r.size() > 0) {
- if (r[0].is_deleted() || r[0].is_tombstone()) {
- return output;
- }
-
- output.push_back(r[0].rec);
- return output;
- }
- }
-
- return output;
- }
-
- static void delete_query_state(void *state) {
- auto s = (State<R> *) state;
- delete s;
- }
-
- static void delete_buffer_query_state(void *state) {
- auto s = (BufferState<R> *) state;
- delete s;
- }
-
-
- static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) {
- return false;
+ output.push_back(r[0].rec);
+ return;
+ }
}
+ }
+
+ static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ std::vector<LocalQuery *> const &local_queries,
+ LocalQueryBuffer *buffer_query) {
+ return false;
+ }
};
-
-}}
+} // namespace pl
+} // namespace de