diff options
| author | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-12-06 16:54:05 -0500 |
|---|---|---|
| committer | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-12-06 16:54:05 -0500 |
| commit | 9876d74e503df64eb9e82e540ca41fcf593ebf64 (patch) | |
| tree | 3512690afa8c04f702bd8de500bf8f41b05571c4 /include/query | |
| parent | e2b81a2d311470d503edae93e68e82791f6bb17c (diff) | |
| download | dynamic-extension-master.tar.gz | |
Now, the vector<> is part of the user-defined type, not required by the
framework. This should allow for more flexibility in either using
alternative containers, or for more sensible implementations of queries
with single value results (like range count).
Diffstat (limited to 'include/query')
| -rw-r--r-- | include/query/irs.h | 41 | ||||
| -rw-r--r-- | include/query/knn.h | 42 | ||||
| -rw-r--r-- | include/query/pointlookup.h | 19 | ||||
| -rw-r--r-- | include/query/rangecount.h | 37 | ||||
| -rw-r--r-- | include/query/rangequery.h | 39 | ||||
| -rw-r--r-- | include/query/wss.h | 28 |
6 files changed, 102 insertions, 104 deletions
diff --git a/include/query/irs.h b/include/query/irs.h index 6dec850..ec6fa29 100644 --- a/include/query/irs.h +++ b/include/query/irs.h @@ -40,15 +40,15 @@ public: BufferView<R> *buffer; size_t cutoff; - std::vector<Wrapped<R>> records; + std::vector<R> records; std::unique_ptr<psudb::Alias> alias; size_t sample_size; Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<R> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = false; @@ -86,9 +86,11 @@ public: } for (size_t i = 0; i < query->cutoff; i++) { - if ((query->buffer->get(i)->rec.key >= query->global_parms.lower_bound) && - (buffer->get(i)->rec.key <= query->global_parms.upper_bound)) { - query->records.emplace_back(*(query->buffer->get(i))); + auto rec = buffer->get(i); + if (rec->rec.key >= query->global_parms.lower_bound && + rec->rec.key <= query->global_parms.upper_bound && + (!rec->is_deleted() && !rec->is_tombstone())) { + query->records.emplace_back(query->buffer->get(i)->rec); } } @@ -163,10 +165,10 @@ public: } } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { + static LocalResultType local_query(S *shard, LocalQuery *query) { auto sample_sz = query->sample_size; - std::vector<LocalResultType> result_set; + LocalResultType result_set; if (sample_sz == 0 || query->lower_idx == shard->get_record_count()) { return result_set; @@ -180,15 +182,19 @@ public: (range_length > 0) ? gsl_rng_uniform_int(query->global_parms.rng, range_length) : 0; - result_set.emplace_back(*shard->get_record_at(query->lower_idx + idx)); + auto wrec = shard->get_record_at(query->lower_idx + idx); + + if (!wrec->is_deleted() && !wrec->is_tombstone()) { + result_set.emplace_back(wrec->rec); + } } while (attempts < sample_sz); return result_set; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; result.reserve(query->sample_size); if constexpr (REJECTION) { @@ -197,8 +203,9 @@ public: auto rec = query->buffer->get(idx); if (rec->rec.key >= query->global_parms.lower_bound && - rec->rec.key <= query->global_parms.upper_bound) { - result.emplace_back(*rec); + rec->rec.key <= query->global_parms.upper_bound && + (!rec->is_deleted() && !rec->is_tombstone())) { + result.emplace_back(rec->rec); } } @@ -215,16 +222,16 @@ public: } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { - output.emplace_back(local_results[i][j].rec); + output.emplace_back(local_results[i][j]); } } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { if (output.size() < parms->sample_size) { diff --git a/include/query/knn.h b/include/query/knn.h index 87ea10a..91a032c 100644 --- a/include/query/knn.h +++ b/include/query/knn.h @@ -39,8 +39,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<const Wrapped<R> *> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = true; @@ -66,8 +66,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> results; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType results; Wrapped<R> wrec; wrec.rec = query->global_parms.point; @@ -79,17 +79,16 @@ public: shard->search(query->global_parms.point, query->global_parms.k, pq); while (pq.size() > 0) { - results.emplace_back(*pq.peek().data); + results.emplace_back(pq.peek().data); pq.pop(); } return results; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> results; + LocalResultType results; Wrapped<R> wrec; wrec.rec = query->global_parms.point; @@ -118,41 +117,44 @@ public: } while (pq.size() > 0) { - results.emplace_back(*(pq.peek().data)); + results.emplace_back(pq.peek().data); pq.pop(); } - return std::move(results); + return results; } - static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + static void combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { + + Wrapped<R> wrec; + wrec.rec = parms->point; + wrec.header = 0; + PriorityQueue<Wrapped<R>, DistCmpMax<Wrapped<R>>> pq(parms->k, &wrec); - PriorityQueue<R, DistCmpMax<R>> pq(parms->k, &(parms->point)); for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { if (pq.size() < parms->k) { - pq.push(&local_results[i][j].rec); + pq.push(local_results[i][j]); } else { - double head_dist = pq.peek().data->calc_distance(parms->point); - double cur_dist = local_results[i][j].rec.calc_distance(parms->point); + double head_dist = pq.peek().data->rec.calc_distance(wrec.rec); + double cur_dist = local_results[i][j]->rec.calc_distance(wrec.rec); if (cur_dist < head_dist) { pq.pop(); - pq.push(&local_results[i][j].rec); + pq.push(local_results[i][j]); } } } } while (pq.size() > 0) { - output.emplace_back(*pq.peek().data); + output.emplace_back(pq.peek().data->rec); pq.pop(); } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h index f3788de..65cffa7 100644 --- a/include/query/pointlookup.h +++ b/include/query/pointlookup.h @@ -38,8 +38,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<Wrapped<R>> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = true; constexpr static bool SKIP_DELETE_FILTER = true; @@ -65,8 +65,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; auto r = shard->point_lookup({query->global_parms.search_key, 0}); @@ -77,9 +77,8 @@ public: return result; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { + LocalResultType result; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); @@ -95,8 +94,8 @@ public: static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (auto r : local_results) { if (r.size() > 0) { if (r[0].is_deleted() || r[0].is_tombstone()) { @@ -109,7 +108,7 @@ public: } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 68d304d..0898473 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -16,7 +16,7 @@ namespace de { namespace rc { -template <ShardInterface S, bool FORCE_SCAN = true> class Query { +template <ShardInterface S> class Query { typedef typename S::RECORD R; public: @@ -75,8 +75,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result = {0, 0}; /* * if the returned index is one past the end of the @@ -88,8 +88,6 @@ public: } auto ptr = shard->get_record_at(query->start_idx); - size_t reccnt = 0; - size_t tscnt = 0; /* * roll the pointer forward to the first record that is @@ -104,53 +102,48 @@ public: ptr->rec.key <= query->global_parms.upper_bound) { if (!ptr->is_deleted()) { - reccnt++; + result.record_count++; if (ptr->is_tombstone()) { - tscnt++; + result.tombstone_count++; } } ptr++; } - result.push_back({reccnt, tscnt}); return result; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; - size_t reccnt = 0; - size_t tscnt = 0; + LocalResultType result = {0, 0}; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); if (rec->rec.key >= query->global_parms.lower_bound && rec->rec.key <= query->global_parms.upper_bound) { if (!rec->is_deleted()) { - reccnt++; + result.record_count++; if (rec->is_tombstone()) { - tscnt++; + result.tombstone_count++; } } } } - result.push_back({reccnt, tscnt}); - return result; } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { size_t reccnt = 0; size_t tscnt = 0; for (auto &local_result : local_results) { - reccnt += local_result[0].record_count; - tscnt += local_result[0].tombstone_count; + reccnt += local_result.record_count; + tscnt += local_result.tombstone_count; } /* if more tombstones than results, clamp the output at 0 */ @@ -158,10 +151,10 @@ public: tscnt = reccnt; } - output.push_back({reccnt - tscnt}); + output = reccnt - tscnt; } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/rangequery.h b/include/query/rangequery.h index e7be39c..d9e7db8 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -38,8 +38,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<Wrapped<R>> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = true; @@ -69,8 +69,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; /* * if the returned index is one past the end of the @@ -101,10 +101,9 @@ public: return result; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); if (rec->rec.key >= query->global_parms.lower_bound && @@ -116,26 +115,25 @@ public: return result; } - static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { - std::vector<Cursor<LocalResultType>> cursors; + static void combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { + std::vector<Cursor<Wrapped<R>>> cursors; cursors.reserve(local_results.size()); - psudb::PriorityQueue<LocalResultType> pq(local_results.size()); + psudb::PriorityQueue<Wrapped<R>> pq(local_results.size()); size_t total = 0; size_t tmp_n = local_results.size(); for (size_t i = 0; i < tmp_n; ++i) if (local_results[i].size() > 0) { auto base = local_results[i].data(); - cursors.emplace_back(Cursor<LocalResultType>{ + cursors.emplace_back(Cursor<Wrapped<R>>{ base, base + local_results[i].size(), 0, local_results[i].size()}); assert(i == cursors.size() - 1); total += local_results[i].size(); pq.push(cursors[i].ptr, tmp_n - i - 1); } else { - cursors.emplace_back(Cursor<LocalResultType>{nullptr, nullptr, 0, 0}); + cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); } if (total == 0) { @@ -146,9 +144,8 @@ public: while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 - ? pq.peek(1) - : psudb::queue_record<LocalResultType>{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) + : psudb::queue_record<Wrapped<R>>{nullptr, 0}; if (!now.data->is_tombstone() && next.data != nullptr && now.data->rec == next.data->rec && next.data->is_tombstone()) { @@ -156,9 +153,9 @@ public: pq.pop(); auto &cursor1 = cursors[tmp_n - now.version - 1]; auto &cursor2 = cursors[tmp_n - next.version - 1]; - if (advance_cursor<LocalResultType>(cursor1)) + if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor<LocalResultType>(cursor2)) + if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto &cursor = cursors[tmp_n - now.version - 1]; @@ -167,7 +164,7 @@ public: pq.pop(); - if (advance_cursor<LocalResultType>(cursor)) + if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version); } } @@ -175,7 +172,7 @@ public: return; } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/wss.h b/include/query/wss.h index 54620ca..d4e75f3 100644 --- a/include/query/wss.h +++ b/include/query/wss.h @@ -49,8 +49,8 @@ public: constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = false; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<R> LocalResultType; + typedef std::vector<R> ResultType; static LocalQuery *local_preproc(S *shard, Parameters *parms) { auto query = new LocalQuery(); @@ -130,8 +130,8 @@ public: } } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; if (query->sample_size == 0) { return result; @@ -139,25 +139,25 @@ public: for (size_t i = 0; i < query->sample_size; i++) { size_t idx = shard->get_weighted_sample(query->global_parms.rng); - if (!shard->get_record_at(idx)->is_deleted()) { - result.emplace_back(*shard->get_record_at(idx)); + if (!shard->get_record_at(idx)->is_deleted() && !shard->get_record_at(idx)->is_tombstone()) { + result.emplace_back(shard->get_record_at(idx)->rec); } } return result; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; for (size_t i = 0; i < query->sample_size; i++) { auto idx = gsl_rng_uniform_int(query->global_parms.rng, query->cutoff); auto rec = query->buffer->get(idx); auto test = gsl_rng_uniform(query->global_parms.rng) * query->max_weight; - if (test <= rec->rec.weight && !rec->is_deleted()) { - result.emplace_back(*rec); + if (test <= rec->rec.weight && !rec->is_deleted() && !rec->is_tombstone()) { + result.emplace_back(rec->rec); } } @@ -165,16 +165,16 @@ public: } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { - output.emplace_back(local_results[i][j].rec); + output.emplace_back(local_results[i][j]); } } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { if (output.size() < parms->sample_size) { |