diff options
Diffstat (limited to 'include/shard/PGM.h')
| -rw-r--r-- | include/shard/PGM.h | 128 |
1 files changed, 115 insertions, 13 deletions
diff --git a/include/shard/PGM.h b/include/shard/PGM.h index 65d548e..aba5227 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -34,9 +34,17 @@ struct pgm_range_query_parms { }; template <RecordInterface R> +struct PGMPointLookupParms { + decltype(R::key) target_key; +}; + +template <RecordInterface R> class PGMRangeQuery; template <RecordInterface R> +class PGMPointLookup; + +template <RecordInterface R> struct PGMState { size_t start_idx; size_t stop_idx; @@ -45,12 +53,6 @@ struct PGMState { template <RecordInterface R> struct PGMBufferState { size_t cutoff; - Alias* alias; - - ~PGMBufferState() { - delete alias; - } - }; template <RecordInterface R, size_t epsilon=128> @@ -64,6 +66,7 @@ public: // FIXME: there has to be a better way to do this friend class PGMRangeQuery<R>; + friend class PGMPointLookup<R>; PGM(MutableBuffer<R>* buffer) : m_reccnt(0), m_tombstone_cnt(0) { @@ -274,11 +277,80 @@ private: pgm::PGMIndex<K, epsilon> m_pgm; BloomFilter<R> *m_bf; }; +template <RecordInterface R> +class PGMPointLookup { +public: + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=false; + + static void *get_query_state(PGM<R> *ts, void *parms) { + return nullptr; + } + + static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) { + return nullptr; + } + + static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void *buff_state) { + return; + } + + static std::vector<Wrapped<R>> query(PGM<R> *ts, void *q_state, void *parms) { + std::vector<Wrapped<R>> records; + auto p = (PGMPointLookupParms<R> *) parms; + auto s = (PGMState<R> *) q_state; + + size_t idx = ts->get_lower_bound(p->target_key); + if (ts->get_record_at(idx)->rec.key == p->target_key) { + records.emplace_back(*ts->get_record_at(idx)); + } + + return records; + } + + static std::vector<Wrapped<R>> buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) { + auto p = (PGMPointLookupParms<R> *) parms; + auto s = (PGMBufferState<R> *) state; + + std::vector<Wrapped<R>> records; + for (size_t i=0; i<buffer->get_record_count(); i++) { + auto rec = buffer->get_data() + i; + if (rec->rec.key == p->target_key) { + records.emplace_back(*rec); + return records; + } + } + + return records; + } + + static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) { + std::vector<R> output; + for (size_t i=0 ;i<results.size(); i++) { + if (results[i].size() > 0) { + output.emplace_back(results[i][0].rec); + return output; + } + } + + return output; + } + + static void delete_query_state(void *state) { + } + + static void delete_buffer_query_state(void *state) { + } +}; + template <RecordInterface R> class PGMRangeQuery { public: + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=false; + static void *get_query_state(PGM<R> *ts, void *parms) { auto res = new PGMState<R>(); auto p = (pgm_range_query_parms<R> *) parms; @@ -296,7 +368,7 @@ public: return res; } - static void process_query_states(void *query_parms, std::vector<void*> shard_states, void *buff_state) { + static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void *buff_state) { return; } @@ -343,11 +415,25 @@ public: return records; } - static std::vector<R> merge(std::vector<std::vector<R>> &results, void *parms) { + static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) { + std::vector<Cursor<Wrapped<R>>> cursors; + cursors.reserve(results.size()); + + PriorityQueue<Wrapped<R>> pq(results.size()); size_t total = 0; - for (size_t i=0; i<results.size(); i++) { - total += results[i].size(); - } + size_t tmp_n = results.size(); + + + for (size_t i = 0; i < tmp_n; ++i) + if (results[i].size() > 0){ + auto base = results[i].data(); + cursors.emplace_back(Cursor{base, base + results[i].size(), 0, results[i].size()}); + assert(i == cursors.size() - 1); + total += results[i].size(); + pq.push(cursors[i].ptr, tmp_n - i - 1); + } else { + cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); + } if (total == 0) { return std::vector<R>(); @@ -356,8 +442,24 @@ public: std::vector<R> output; output.reserve(total); - for (size_t i=0; i<results.size(); i++) { - std::move(results[i].begin(), results[i].end(), std::back_inserter(output)); + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[tmp_n - now.version - 1]; + auto& cursor2 = cursors[tmp_n - next.version - 1]; + if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[tmp_n - now.version - 1]; + if (!now.data->is_tombstone()) output.push_back(cursor.ptr->rec); + pq.pop(); + + if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version); + } } return output; |