diff options
Diffstat (limited to 'include/query')
| -rw-r--r-- | include/query/irs.h | 41 | ||||
| -rw-r--r-- | include/query/knn.h | 42 | ||||
| -rw-r--r-- | include/query/pointlookup.h | 19 | ||||
| -rw-r--r-- | include/query/rangecount.h | 37 | ||||
| -rw-r--r-- | include/query/rangequery.h | 39 | ||||
| -rw-r--r-- | include/query/wss.h | 28 |
6 files changed, 102 insertions, 104 deletions
diff --git a/include/query/irs.h b/include/query/irs.h index 6dec850..ec6fa29 100644 --- a/include/query/irs.h +++ b/include/query/irs.h @@ -40,15 +40,15 @@ public: BufferView<R> *buffer; size_t cutoff; - std::vector<Wrapped<R>> records; + std::vector<R> records; std::unique_ptr<psudb::Alias> alias; size_t sample_size; Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<R> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = false; @@ -86,9 +86,11 @@ public: } for (size_t i = 0; i < query->cutoff; i++) { - if ((query->buffer->get(i)->rec.key >= query->global_parms.lower_bound) && - (buffer->get(i)->rec.key <= query->global_parms.upper_bound)) { - query->records.emplace_back(*(query->buffer->get(i))); + auto rec = buffer->get(i); + if (rec->rec.key >= query->global_parms.lower_bound && + rec->rec.key <= query->global_parms.upper_bound && + (!rec->is_deleted() && !rec->is_tombstone())) { + query->records.emplace_back(query->buffer->get(i)->rec); } } @@ -163,10 +165,10 @@ public: } } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { + static LocalResultType local_query(S *shard, LocalQuery *query) { auto sample_sz = query->sample_size; - std::vector<LocalResultType> result_set; + LocalResultType result_set; if (sample_sz == 0 || query->lower_idx == shard->get_record_count()) { return result_set; @@ -180,15 +182,19 @@ public: (range_length > 0) ? gsl_rng_uniform_int(query->global_parms.rng, range_length) : 0; - result_set.emplace_back(*shard->get_record_at(query->lower_idx + idx)); + auto wrec = shard->get_record_at(query->lower_idx + idx); + + if (!wrec->is_deleted() && !wrec->is_tombstone()) { + result_set.emplace_back(wrec->rec); + } } while (attempts < sample_sz); return result_set; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; result.reserve(query->sample_size); if constexpr (REJECTION) { @@ -197,8 +203,9 @@ public: auto rec = query->buffer->get(idx); if (rec->rec.key >= query->global_parms.lower_bound && - rec->rec.key <= query->global_parms.upper_bound) { - result.emplace_back(*rec); + rec->rec.key <= query->global_parms.upper_bound && + (!rec->is_deleted() && !rec->is_tombstone())) { + result.emplace_back(rec->rec); } } @@ -215,16 +222,16 @@ public: } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { - output.emplace_back(local_results[i][j].rec); + output.emplace_back(local_results[i][j]); } } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { if (output.size() < parms->sample_size) { diff --git a/include/query/knn.h b/include/query/knn.h index 87ea10a..91a032c 100644 --- a/include/query/knn.h +++ b/include/query/knn.h @@ -39,8 +39,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<const Wrapped<R> *> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = true; @@ -66,8 +66,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> results; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType results; Wrapped<R> wrec; wrec.rec = query->global_parms.point; @@ -79,17 +79,16 @@ public: shard->search(query->global_parms.point, query->global_parms.k, pq); while (pq.size() > 0) { - results.emplace_back(*pq.peek().data); + results.emplace_back(pq.peek().data); pq.pop(); } return results; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> results; + LocalResultType results; Wrapped<R> wrec; wrec.rec = query->global_parms.point; @@ -118,41 +117,44 @@ public: } while (pq.size() > 0) { - results.emplace_back(*(pq.peek().data)); + results.emplace_back(pq.peek().data); pq.pop(); } - return std::move(results); + return results; } - static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + static void combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { + + Wrapped<R> wrec; + wrec.rec = parms->point; + wrec.header = 0; + PriorityQueue<Wrapped<R>, DistCmpMax<Wrapped<R>>> pq(parms->k, &wrec); - PriorityQueue<R, DistCmpMax<R>> pq(parms->k, &(parms->point)); for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { if (pq.size() < parms->k) { - pq.push(&local_results[i][j].rec); + pq.push(local_results[i][j]); } else { - double head_dist = pq.peek().data->calc_distance(parms->point); - double cur_dist = local_results[i][j].rec.calc_distance(parms->point); + double head_dist = pq.peek().data->rec.calc_distance(wrec.rec); + double cur_dist = local_results[i][j]->rec.calc_distance(wrec.rec); if (cur_dist < head_dist) { pq.pop(); - pq.push(&local_results[i][j].rec); + pq.push(local_results[i][j]); } } } } while (pq.size() > 0) { - output.emplace_back(*pq.peek().data); + output.emplace_back(pq.peek().data->rec); pq.pop(); } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h index f3788de..65cffa7 100644 --- a/include/query/pointlookup.h +++ b/include/query/pointlookup.h @@ -38,8 +38,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<Wrapped<R>> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = true; constexpr static bool SKIP_DELETE_FILTER = true; @@ -65,8 +65,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; auto r = shard->point_lookup({query->global_parms.search_key, 0}); @@ -77,9 +77,8 @@ public: return result; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { + LocalResultType result; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); @@ -95,8 +94,8 @@ public: static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (auto r : local_results) { if (r.size() > 0) { if (r[0].is_deleted() || r[0].is_tombstone()) { @@ -109,7 +108,7 @@ public: } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 68d304d..0898473 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -16,7 +16,7 @@ namespace de { namespace rc { -template <ShardInterface S, bool FORCE_SCAN = true> class Query { +template <ShardInterface S> class Query { typedef typename S::RECORD R; public: @@ -75,8 +75,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result = {0, 0}; /* * if the returned index is one past the end of the @@ -88,8 +88,6 @@ public: } auto ptr = shard->get_record_at(query->start_idx); - size_t reccnt = 0; - size_t tscnt = 0; /* * roll the pointer forward to the first record that is @@ -104,53 +102,48 @@ public: ptr->rec.key <= query->global_parms.upper_bound) { if (!ptr->is_deleted()) { - reccnt++; + result.record_count++; if (ptr->is_tombstone()) { - tscnt++; + result.tombstone_count++; } } ptr++; } - result.push_back({reccnt, tscnt}); return result; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; - size_t reccnt = 0; - size_t tscnt = 0; + LocalResultType result = {0, 0}; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); if (rec->rec.key >= query->global_parms.lower_bound && rec->rec.key <= query->global_parms.upper_bound) { if (!rec->is_deleted()) { - reccnt++; + result.record_count++; if (rec->is_tombstone()) { - tscnt++; + result.tombstone_count++; } } } } - result.push_back({reccnt, tscnt}); - return result; } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { size_t reccnt = 0; size_t tscnt = 0; for (auto &local_result : local_results) { - reccnt += local_result[0].record_count; - tscnt += local_result[0].tombstone_count; + reccnt += local_result.record_count; + tscnt += local_result.tombstone_count; } /* if more tombstones than results, clamp the output at 0 */ @@ -158,10 +151,10 @@ public: tscnt = reccnt; } - output.push_back({reccnt - tscnt}); + output = reccnt - tscnt; } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/rangequery.h b/include/query/rangequery.h index e7be39c..d9e7db8 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -38,8 +38,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<Wrapped<R>> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = true; @@ -69,8 +69,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; /* * if the returned index is one past the end of the @@ -101,10 +101,9 @@ public: return result; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); if (rec->rec.key >= query->global_parms.lower_bound && @@ -116,26 +115,25 @@ public: return result; } - static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { - std::vector<Cursor<LocalResultType>> cursors; + static void combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { + std::vector<Cursor<Wrapped<R>>> cursors; cursors.reserve(local_results.size()); - psudb::PriorityQueue<LocalResultType> pq(local_results.size()); + psudb::PriorityQueue<Wrapped<R>> pq(local_results.size()); size_t total = 0; size_t tmp_n = local_results.size(); for (size_t i = 0; i < tmp_n; ++i) if (local_results[i].size() > 0) { auto base = local_results[i].data(); - cursors.emplace_back(Cursor<LocalResultType>{ + cursors.emplace_back(Cursor<Wrapped<R>>{ base, base + local_results[i].size(), 0, local_results[i].size()}); assert(i == cursors.size() - 1); total += local_results[i].size(); pq.push(cursors[i].ptr, tmp_n - i - 1); } else { - cursors.emplace_back(Cursor<LocalResultType>{nullptr, nullptr, 0, 0}); + cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); } if (total == 0) { @@ -146,9 +144,8 @@ public: while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 - ? pq.peek(1) - : psudb::queue_record<LocalResultType>{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) + : psudb::queue_record<Wrapped<R>>{nullptr, 0}; if (!now.data->is_tombstone() && next.data != nullptr && now.data->rec == next.data->rec && next.data->is_tombstone()) { @@ -156,9 +153,9 @@ public: pq.pop(); auto &cursor1 = cursors[tmp_n - now.version - 1]; auto &cursor2 = cursors[tmp_n - next.version - 1]; - if (advance_cursor<LocalResultType>(cursor1)) + if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor<LocalResultType>(cursor2)) + if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto &cursor = cursors[tmp_n - now.version - 1]; @@ -167,7 +164,7 @@ public: pq.pop(); - if (advance_cursor<LocalResultType>(cursor)) + if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version); } } @@ -175,7 +172,7 @@ public: return; } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/wss.h b/include/query/wss.h index 54620ca..d4e75f3 100644 --- a/include/query/wss.h +++ b/include/query/wss.h @@ -49,8 +49,8 @@ public: constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = false; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<R> LocalResultType; + typedef std::vector<R> ResultType; static LocalQuery *local_preproc(S *shard, Parameters *parms) { auto query = new LocalQuery(); @@ -130,8 +130,8 @@ public: } } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; if (query->sample_size == 0) { return result; @@ -139,25 +139,25 @@ public: for (size_t i = 0; i < query->sample_size; i++) { size_t idx = shard->get_weighted_sample(query->global_parms.rng); - if (!shard->get_record_at(idx)->is_deleted()) { - result.emplace_back(*shard->get_record_at(idx)); + if (!shard->get_record_at(idx)->is_deleted() && !shard->get_record_at(idx)->is_tombstone()) { + result.emplace_back(shard->get_record_at(idx)->rec); } } return result; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; for (size_t i = 0; i < query->sample_size; i++) { auto idx = gsl_rng_uniform_int(query->global_parms.rng, query->cutoff); auto rec = query->buffer->get(idx); auto test = gsl_rng_uniform(query->global_parms.rng) * query->max_weight; - if (test <= rec->rec.weight && !rec->is_deleted()) { - result.emplace_back(*rec); + if (test <= rec->rec.weight && !rec->is_deleted() && !rec->is_tombstone()) { + result.emplace_back(rec->rec); } } @@ -165,16 +165,16 @@ public: } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { - output.emplace_back(local_results[i][j].rec); + output.emplace_back(local_results[i][j]); } } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { if (output.size() < parms->sample_size) { |