From 9876d74e503df64eb9e82e540ca41fcf593ebf64 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Fri, 6 Dec 2024 16:54:05 -0500 Subject: Adjusted query result interfaces Now, the vector<> is part of the user-defined type, not required by the framework. This should allow for more flexibility in either using alternative containers, or for more sensible implementations of queries with single value results (like range count). --- include/query/irs.h | 41 ++++++++++++++++++++++++----------------- include/query/knn.h | 42 ++++++++++++++++++++++-------------------- include/query/pointlookup.h | 19 +++++++++---------- include/query/rangecount.h | 37 +++++++++++++++---------------------- include/query/rangequery.h | 39 ++++++++++++++++++--------------------- include/query/wss.h | 28 ++++++++++++++-------------- 6 files changed, 102 insertions(+), 104 deletions(-) (limited to 'include/query') diff --git a/include/query/irs.h b/include/query/irs.h index 6dec850..ec6fa29 100644 --- a/include/query/irs.h +++ b/include/query/irs.h @@ -40,15 +40,15 @@ public: BufferView *buffer; size_t cutoff; - std::vector> records; + std::vector records; std::unique_ptr alias; size_t sample_size; Parameters global_parms; }; - typedef Wrapped LocalResultType; - typedef R ResultType; + typedef std::vector LocalResultType; + typedef std::vector ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = false; @@ -86,9 +86,11 @@ public: } for (size_t i = 0; i < query->cutoff; i++) { - if ((query->buffer->get(i)->rec.key >= query->global_parms.lower_bound) && - (buffer->get(i)->rec.key <= query->global_parms.upper_bound)) { - query->records.emplace_back(*(query->buffer->get(i))); + auto rec = buffer->get(i); + if (rec->rec.key >= query->global_parms.lower_bound && + rec->rec.key <= query->global_parms.upper_bound && + (!rec->is_deleted() && !rec->is_tombstone())) { + query->records.emplace_back(query->buffer->get(i)->rec); } } @@ -163,10 +165,10 @@ public: } } - static std::vector local_query(S *shard, LocalQuery *query) { + static LocalResultType local_query(S *shard, LocalQuery *query) { auto sample_sz = query->sample_size; - std::vector result_set; + LocalResultType result_set; if (sample_sz == 0 || query->lower_idx == shard->get_record_count()) { return result_set; @@ -180,15 +182,19 @@ public: (range_length > 0) ? gsl_rng_uniform_int(query->global_parms.rng, range_length) : 0; - result_set.emplace_back(*shard->get_record_at(query->lower_idx + idx)); + auto wrec = shard->get_record_at(query->lower_idx + idx); + + if (!wrec->is_deleted() && !wrec->is_tombstone()) { + result_set.emplace_back(wrec->rec); + } } while (attempts < sample_sz); return result_set; } - static std::vector + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector result; + LocalResultType result; result.reserve(query->sample_size); if constexpr (REJECTION) { @@ -197,8 +203,9 @@ public: auto rec = query->buffer->get(idx); if (rec->rec.key >= query->global_parms.lower_bound && - rec->rec.key <= query->global_parms.upper_bound) { - result.emplace_back(*rec); + rec->rec.key <= query->global_parms.upper_bound && + (!rec->is_deleted() && !rec->is_tombstone())) { + result.emplace_back(rec->rec); } } @@ -215,16 +222,16 @@ public: } static void - combine(std::vector> const &local_results, - Parameters *parms, std::vector &output) { + combine(std::vector const &local_results, + Parameters *parms, ResultType &output) { for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { - output.emplace_back(local_results[i][j].rec); + output.emplace_back(local_results[i][j]); } } } - static bool repeat(Parameters *parms, std::vector &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector const &local_queries, LocalQueryBuffer *buffer_query) { if (output.size() < parms->sample_size) { diff --git a/include/query/knn.h b/include/query/knn.h index 87ea10a..91a032c 100644 --- a/include/query/knn.h +++ b/include/query/knn.h @@ -39,8 +39,8 @@ public: Parameters global_parms; }; - typedef Wrapped LocalResultType; - typedef R ResultType; + typedef std::vector *> LocalResultType; + typedef std::vector ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = true; @@ -66,8 +66,8 @@ public: return; } - static std::vector local_query(S *shard, LocalQuery *query) { - std::vector results; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType results; Wrapped wrec; wrec.rec = query->global_parms.point; @@ -79,17 +79,16 @@ public: shard->search(query->global_parms.point, query->global_parms.k, pq); while (pq.size() > 0) { - results.emplace_back(*pq.peek().data); + results.emplace_back(pq.peek().data); pq.pop(); } return results; } - static std::vector - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector results; + LocalResultType results; Wrapped wrec; wrec.rec = query->global_parms.point; @@ -118,41 +117,44 @@ public: } while (pq.size() > 0) { - results.emplace_back(*(pq.peek().data)); + results.emplace_back(pq.peek().data); pq.pop(); } - return std::move(results); + return results; } - static void - combine(std::vector> const &local_results, - Parameters *parms, std::vector &output) { + static void combine(std::vector const &local_results, + Parameters *parms, ResultType &output) { + + Wrapped wrec; + wrec.rec = parms->point; + wrec.header = 0; + PriorityQueue, DistCmpMax>> pq(parms->k, &wrec); - PriorityQueue> pq(parms->k, &(parms->point)); for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { if (pq.size() < parms->k) { - pq.push(&local_results[i][j].rec); + pq.push(local_results[i][j]); } else { - double head_dist = pq.peek().data->calc_distance(parms->point); - double cur_dist = local_results[i][j].rec.calc_distance(parms->point); + double head_dist = pq.peek().data->rec.calc_distance(wrec.rec); + double cur_dist = local_results[i][j]->rec.calc_distance(wrec.rec); if (cur_dist < head_dist) { pq.pop(); - pq.push(&local_results[i][j].rec); + pq.push(local_results[i][j]); } } } } while (pq.size() > 0) { - output.emplace_back(*pq.peek().data); + output.emplace_back(pq.peek().data->rec); pq.pop(); } } - static bool repeat(Parameters *parms, std::vector &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h index f3788de..65cffa7 100644 --- a/include/query/pointlookup.h +++ b/include/query/pointlookup.h @@ -38,8 +38,8 @@ public: Parameters global_parms; }; - typedef Wrapped LocalResultType; - typedef R ResultType; + typedef std::vector> LocalResultType; + typedef std::vector ResultType; constexpr static bool EARLY_ABORT = true; constexpr static bool SKIP_DELETE_FILTER = true; @@ -65,8 +65,8 @@ public: return; } - static std::vector local_query(S *shard, LocalQuery *query) { - std::vector result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; auto r = shard->point_lookup({query->global_parms.search_key, 0}); @@ -77,9 +77,8 @@ public: return result; } - static std::vector - local_query_buffer(LocalQueryBuffer *query) { - std::vector result; + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { + LocalResultType result; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); @@ -95,8 +94,8 @@ public: static void - combine(std::vector> const &local_results, - Parameters *parms, std::vector &output) { + combine(std::vector const &local_results, + Parameters *parms, ResultType &output) { for (auto r : local_results) { if (r.size() > 0) { if (r[0].is_deleted() || r[0].is_tombstone()) { @@ -109,7 +108,7 @@ public: } } - static bool repeat(Parameters *parms, std::vector &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 68d304d..0898473 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -16,7 +16,7 @@ namespace de { namespace rc { -template class Query { +template class Query { typedef typename S::RECORD R; public: @@ -75,8 +75,8 @@ public: return; } - static std::vector local_query(S *shard, LocalQuery *query) { - std::vector result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result = {0, 0}; /* * if the returned index is one past the end of the @@ -88,8 +88,6 @@ public: } auto ptr = shard->get_record_at(query->start_idx); - size_t reccnt = 0; - size_t tscnt = 0; /* * roll the pointer forward to the first record that is @@ -104,53 +102,48 @@ public: ptr->rec.key <= query->global_parms.upper_bound) { if (!ptr->is_deleted()) { - reccnt++; + result.record_count++; if (ptr->is_tombstone()) { - tscnt++; + result.tombstone_count++; } } ptr++; } - result.push_back({reccnt, tscnt}); return result; } - static std::vector + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector result; - size_t reccnt = 0; - size_t tscnt = 0; + LocalResultType result = {0, 0}; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); if (rec->rec.key >= query->global_parms.lower_bound && rec->rec.key <= query->global_parms.upper_bound) { if (!rec->is_deleted()) { - reccnt++; + result.record_count++; if (rec->is_tombstone()) { - tscnt++; + result.tombstone_count++; } } } } - result.push_back({reccnt, tscnt}); - return result; } static void - combine(std::vector> const &local_results, - Parameters *parms, std::vector &output) { + combine(std::vector const &local_results, + Parameters *parms, ResultType &output) { size_t reccnt = 0; size_t tscnt = 0; for (auto &local_result : local_results) { - reccnt += local_result[0].record_count; - tscnt += local_result[0].tombstone_count; + reccnt += local_result.record_count; + tscnt += local_result.tombstone_count; } /* if more tombstones than results, clamp the output at 0 */ @@ -158,10 +151,10 @@ public: tscnt = reccnt; } - output.push_back({reccnt - tscnt}); + output = reccnt - tscnt; } - static bool repeat(Parameters *parms, std::vector &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/rangequery.h b/include/query/rangequery.h index e7be39c..d9e7db8 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -38,8 +38,8 @@ public: Parameters global_parms; }; - typedef Wrapped LocalResultType; - typedef R ResultType; + typedef std::vector> LocalResultType; + typedef std::vector ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = true; @@ -69,8 +69,8 @@ public: return; } - static std::vector local_query(S *shard, LocalQuery *query) { - std::vector result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; /* * if the returned index is one past the end of the @@ -101,10 +101,9 @@ public: return result; } - static std::vector - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector result; + LocalResultType result; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); if (rec->rec.key >= query->global_parms.lower_bound && @@ -116,26 +115,25 @@ public: return result; } - static void - combine(std::vector> const &local_results, - Parameters *parms, std::vector &output) { - std::vector> cursors; + static void combine(std::vector const &local_results, + Parameters *parms, ResultType &output) { + std::vector>> cursors; cursors.reserve(local_results.size()); - psudb::PriorityQueue pq(local_results.size()); + psudb::PriorityQueue> pq(local_results.size()); size_t total = 0; size_t tmp_n = local_results.size(); for (size_t i = 0; i < tmp_n; ++i) if (local_results[i].size() > 0) { auto base = local_results[i].data(); - cursors.emplace_back(Cursor{ + cursors.emplace_back(Cursor>{ base, base + local_results[i].size(), 0, local_results[i].size()}); assert(i == cursors.size() - 1); total += local_results[i].size(); pq.push(cursors[i].ptr, tmp_n - i - 1); } else { - cursors.emplace_back(Cursor{nullptr, nullptr, 0, 0}); + cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); } if (total == 0) { @@ -146,9 +144,8 @@ public: while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 - ? pq.peek(1) - : psudb::queue_record{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) + : psudb::queue_record>{nullptr, 0}; if (!now.data->is_tombstone() && next.data != nullptr && now.data->rec == next.data->rec && next.data->is_tombstone()) { @@ -156,9 +153,9 @@ public: pq.pop(); auto &cursor1 = cursors[tmp_n - now.version - 1]; auto &cursor2 = cursors[tmp_n - next.version - 1]; - if (advance_cursor(cursor1)) + if (advance_cursor>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) + if (advance_cursor>(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto &cursor = cursors[tmp_n - now.version - 1]; @@ -167,7 +164,7 @@ public: pq.pop(); - if (advance_cursor(cursor)) + if (advance_cursor>(cursor)) pq.push(cursor.ptr, now.version); } } @@ -175,7 +172,7 @@ public: return; } - static bool repeat(Parameters *parms, std::vector &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/wss.h b/include/query/wss.h index 54620ca..d4e75f3 100644 --- a/include/query/wss.h +++ b/include/query/wss.h @@ -49,8 +49,8 @@ public: constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = false; - typedef Wrapped LocalResultType; - typedef R ResultType; + typedef std::vector LocalResultType; + typedef std::vector ResultType; static LocalQuery *local_preproc(S *shard, Parameters *parms) { auto query = new LocalQuery(); @@ -130,8 +130,8 @@ public: } } - static std::vector local_query(S *shard, LocalQuery *query) { - std::vector result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; if (query->sample_size == 0) { return result; @@ -139,25 +139,25 @@ public: for (size_t i = 0; i < query->sample_size; i++) { size_t idx = shard->get_weighted_sample(query->global_parms.rng); - if (!shard->get_record_at(idx)->is_deleted()) { - result.emplace_back(*shard->get_record_at(idx)); + if (!shard->get_record_at(idx)->is_deleted() && !shard->get_record_at(idx)->is_tombstone()) { + result.emplace_back(shard->get_record_at(idx)->rec); } } return result; } - static std::vector + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector result; + LocalResultType result; for (size_t i = 0; i < query->sample_size; i++) { auto idx = gsl_rng_uniform_int(query->global_parms.rng, query->cutoff); auto rec = query->buffer->get(idx); auto test = gsl_rng_uniform(query->global_parms.rng) * query->max_weight; - if (test <= rec->rec.weight && !rec->is_deleted()) { - result.emplace_back(*rec); + if (test <= rec->rec.weight && !rec->is_deleted() && !rec->is_tombstone()) { + result.emplace_back(rec->rec); } } @@ -165,16 +165,16 @@ public: } static void - combine(std::vector> const &local_results, - Parameters *parms, std::vector &output) { + combine(std::vector const &local_results, + Parameters *parms, ResultType &output) { for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { - output.emplace_back(local_results[i][j].rec); + output.emplace_back(local_results[i][j]); } } } - static bool repeat(Parameters *parms, std::vector &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector const &local_queries, LocalQueryBuffer *buffer_query) { if (output.size() < parms->sample_size) { -- cgit v1.2.3