From d6e08e9d8d3ac9b356ac50cee22b41f828160247 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 27 Jul 2023 18:21:26 -0400 Subject: Expanded query interface Query interface now enables skipping of delete processing and stopping query processing when first match is found. --- include/shard/Alex.h | 2 +- include/shard/MemISAM.h | 57 ++++++++++++++++---- include/shard/PGM.h | 128 ++++++++++++++++++++++++++++++++++++++++----- include/shard/TrieSpline.h | 48 ++++++++++++++--- include/shard/VPTree.h | 13 +++-- include/shard/WIRS.h | 10 ++-- include/shard/WSS.h | 10 ++-- 7 files changed, 226 insertions(+), 42 deletions(-) (limited to 'include/shard') diff --git a/include/shard/Alex.h b/include/shard/Alex.h index 9ba9666..be5222c 100644 --- a/include/shard/Alex.h +++ b/include/shard/Alex.h @@ -271,7 +271,7 @@ public: return res; } - static void process_query_states(void *query_parms, std::vector shard_states, void *buff_state) { + static void process_query_states(void *query_parms, std::vector &shard_states, void *buff_state) { return; } diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h index 3e3215f..3000a6a 100644 --- a/include/shard/MemISAM.h +++ b/include/shard/MemISAM.h @@ -378,6 +378,9 @@ template class IRSQuery { public: + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=false; + static void *get_query_state(MemISAM *isam, void *parms) { auto res = new IRSState(); decltype(R::key) lower_key = ((irs_query_parms *) parms)->lower_bound; @@ -418,7 +421,7 @@ public: return res; } - static void process_query_states(void *query_parms, std::vector shard_states, void *buff_state) { + static void process_query_states(void *query_parms, std::vector &shard_states, void *buff_state) { auto p = (irs_query_parms *) query_parms; auto bs = (buff_state) ? (IRSBufferState *) buff_state : nullptr; @@ -527,12 +530,12 @@ public: return result; } - static std::vector merge(std::vector> &results, void *parms) { + static std::vector merge(std::vector>> &results, void *parms) { std::vector output; for (size_t i=0; i class ISAMRangeQuery { public: + + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=true; + static void *get_query_state(MemISAM *ts, void *parms) { auto res = new ISAMRangeQueryState(); auto p = (ISAMRangeQueryParms *) parms; @@ -571,7 +578,7 @@ public: return res; } - static void process_query_states(void *query_parms, std::vector shard_states, void *buff_state) { + static void process_query_states(void *query_parms, std::vector &shard_states, void *buff_state) { return; } @@ -618,11 +625,25 @@ public: return records; } - static std::vector merge(std::vector> &results, void *parms) { + static std::vector merge(std::vector>> &results, void *parms) { + std::vector>> cursors; + cursors.reserve(results.size()); + + PriorityQueue> pq(results.size()); size_t total = 0; - for (size_t i=0; i 0){ + auto base = results[i].data(); + cursors.emplace_back(Cursor{base, base + results[i].size(), 0, results[i].size()}); + assert(i == cursors.size() - 1); + total += results[i].size(); + pq.push(cursors[i].ptr, tmp_n - i - 1); + } else { + cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); + } if (total == 0) { return std::vector(); @@ -631,8 +652,24 @@ public: std::vector output; output.reserve(total); - for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[tmp_n - now.version - 1]; + auto& cursor2 = cursors[tmp_n - next.version - 1]; + if (advance_cursor>(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor>(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[tmp_n - now.version - 1]; + if (!now.data->is_tombstone()) output.push_back(cursor.ptr->rec); + pq.pop(); + + if (advance_cursor>(cursor)) pq.push(cursor.ptr, now.version); + } } return output; diff --git a/include/shard/PGM.h b/include/shard/PGM.h index 65d548e..aba5227 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -33,9 +33,17 @@ struct pgm_range_query_parms { decltype(R::key) upper_bound; }; +template +struct PGMPointLookupParms { + decltype(R::key) target_key; +}; + template class PGMRangeQuery; +template +class PGMPointLookup; + template struct PGMState { size_t start_idx; @@ -45,12 +53,6 @@ struct PGMState { template struct PGMBufferState { size_t cutoff; - Alias* alias; - - ~PGMBufferState() { - delete alias; - } - }; template @@ -64,6 +66,7 @@ public: // FIXME: there has to be a better way to do this friend class PGMRangeQuery; + friend class PGMPointLookup; PGM(MutableBuffer* buffer) : m_reccnt(0), m_tombstone_cnt(0) { @@ -274,11 +277,80 @@ private: pgm::PGMIndex m_pgm; BloomFilter *m_bf; }; +template +class PGMPointLookup { +public: + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=false; + + static void *get_query_state(PGM *ts, void *parms) { + return nullptr; + } + + static void* get_buffer_query_state(MutableBuffer *buffer, void *parms) { + return nullptr; + } + + static void process_query_states(void *query_parms, std::vector &shard_states, void *buff_state) { + return; + } + + static std::vector> query(PGM *ts, void *q_state, void *parms) { + std::vector> records; + auto p = (PGMPointLookupParms *) parms; + auto s = (PGMState *) q_state; + + size_t idx = ts->get_lower_bound(p->target_key); + if (ts->get_record_at(idx)->rec.key == p->target_key) { + records.emplace_back(*ts->get_record_at(idx)); + } + + return records; + } + + static std::vector> buffer_query(MutableBuffer *buffer, void *state, void *parms) { + auto p = (PGMPointLookupParms *) parms; + auto s = (PGMBufferState *) state; + + std::vector> records; + for (size_t i=0; iget_record_count(); i++) { + auto rec = buffer->get_data() + i; + if (rec->rec.key == p->target_key) { + records.emplace_back(*rec); + return records; + } + } + + return records; + } + + static std::vector merge(std::vector>> &results, void *parms) { + std::vector output; + for (size_t i=0 ;i 0) { + output.emplace_back(results[i][0].rec); + return output; + } + } + + return output; + } + + static void delete_query_state(void *state) { + } + + static void delete_buffer_query_state(void *state) { + } +}; + template class PGMRangeQuery { public: + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=false; + static void *get_query_state(PGM *ts, void *parms) { auto res = new PGMState(); auto p = (pgm_range_query_parms *) parms; @@ -296,7 +368,7 @@ public: return res; } - static void process_query_states(void *query_parms, std::vector shard_states, void *buff_state) { + static void process_query_states(void *query_parms, std::vector &shard_states, void *buff_state) { return; } @@ -343,11 +415,25 @@ public: return records; } - static std::vector merge(std::vector> &results, void *parms) { + static std::vector merge(std::vector>> &results, void *parms) { + std::vector>> cursors; + cursors.reserve(results.size()); + + PriorityQueue> pq(results.size()); size_t total = 0; - for (size_t i=0; i 0){ + auto base = results[i].data(); + cursors.emplace_back(Cursor{base, base + results[i].size(), 0, results[i].size()}); + assert(i == cursors.size() - 1); + total += results[i].size(); + pq.push(cursors[i].ptr, tmp_n - i - 1); + } else { + cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); + } if (total == 0) { return std::vector(); @@ -356,8 +442,24 @@ public: std::vector output; output.reserve(total); - for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[tmp_n - now.version - 1]; + auto& cursor2 = cursors[tmp_n - next.version - 1]; + if (advance_cursor>(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor>(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[tmp_n - now.version - 1]; + if (!now.data->is_tombstone()) output.push_back(cursor.ptr->rec); + pq.pop(); + + if (advance_cursor>(cursor)) pq.push(cursor.ptr, now.version); + } } return output; diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index f06756f..7d1a90d 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -300,6 +300,9 @@ private: template class TrieSplineRangeQuery { public: + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=true; + static void *get_query_state(TrieSpline *ts, void *parms) { auto res = new TrieSplineState(); auto p = (ts_range_query_parms *) parms; @@ -317,7 +320,7 @@ public: return res; } - static void process_query_states(void *query_parms, std::vector shard_states, void *buff_state) { + static void process_query_states(void *query_parms, std::vector &shard_states, void *buff_state) { return; } @@ -367,11 +370,25 @@ public: return records; } - static std::vector merge(std::vector> &results, void *parms) { + static std::vector merge(std::vector>> &results, void *parms) { + std::vector>> cursors; + cursors.reserve(results.size()); + + PriorityQueue> pq(results.size()); size_t total = 0; - for (size_t i=0; i 0){ + auto base = results[i].data(); + cursors.emplace_back(Cursor{base, base + results[i].size(), 0, results[i].size()}); + assert(i == cursors.size() - 1); + total += results[i].size(); + pq.push(cursors[i].ptr, tmp_n - i - 1); + } else { + cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); + } if (total == 0) { return std::vector(); @@ -380,11 +397,28 @@ public: std::vector output; output.reserve(total); - for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[tmp_n - now.version - 1]; + auto& cursor2 = cursors[tmp_n - next.version - 1]; + if (advance_cursor>(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor>(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[tmp_n - now.version - 1]; + if (!now.data->is_tombstone()) output.push_back(cursor.ptr->rec); + pq.pop(); + + if (advance_cursor>(cursor)) pq.push(cursor.ptr, now.version); + } } return output; + } static void delete_query_state(void *state) { diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h index 5f740dc..86f4ab7 100644 --- a/include/shard/VPTree.h +++ b/include/shard/VPTree.h @@ -424,6 +424,9 @@ private: template class KNNQuery { public: + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=true; + static void *get_query_state(VPTree *wss, void *parms) { return nullptr; } @@ -432,7 +435,7 @@ public: return nullptr; } - static void process_query_states(void *query_parms, std::vector shard_states, void *buff_state) { + static void process_query_states(void *query_parms, std::vector &shard_states, void *buff_state) { return; } @@ -494,7 +497,7 @@ public: return results; } - static std::vector merge(std::vector> &results, void *parms) { + static std::vector merge(std::vector>> &results, void *parms) { KNNQueryParms *p = (KNNQueryParms *) parms; R rec = p->point; size_t k = p->k; @@ -503,14 +506,14 @@ public: for (size_t i=0; icalc_distance(rec); - double cur_dist = results[i][j].calc_distance(rec); + double cur_dist = results[i][j].rec.calc_distance(rec); if (cur_dist < head_dist) { pq.pop(); - pq.push(&results[i][j]); + pq.push(&results[i][j].rec); } } } diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h index 1a63092..fafdef0 100644 --- a/include/shard/WIRS.h +++ b/include/shard/WIRS.h @@ -370,6 +370,10 @@ private: template class WIRSQuery { public: + + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=false; + static void *get_query_state(WIRS *wirs, void *parms) { auto res = new WIRSState(); decltype(R::key) lower_key = ((wirs_query_parms *) parms)->lower_bound; @@ -440,7 +444,7 @@ public: return state; } - static void process_query_states(void *query_parms, std::vector shard_states, void *buff_state) { + static void process_query_states(void *query_parms, std::vector &shard_states, void *buff_state) { auto p = (wirs_query_parms *) query_parms; auto bs = (WIRSBufferState *) buff_state; @@ -549,12 +553,12 @@ public: return result; } - static std::vector merge(std::vector> &results, void *parms) { + static std::vector merge(std::vector>> &results, void *parms) { std::vector output; for (size_t i=0; i class WSSQuery { public: + + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=false; + static void *get_query_state(WSS *wss, void *parms) { auto res = new WSSState(); res->total_weight = wss->m_total_weight; @@ -325,7 +329,7 @@ public: return state; } - static void process_query_states(void *query_parms, std::vector shard_states, void *buff_state) { + static void process_query_states(void *query_parms, std::vector &shard_states, void *buff_state) { auto p = (wss_query_parms *) query_parms; auto bs = (WSSBufferState *) buff_state; @@ -415,12 +419,12 @@ public: return result; } - static std::vector merge(std::vector> &results, void *parms) { + static std::vector merge(std::vector>> &results, void *parms) { std::vector output; for (size_t i=0; i