diff options
Diffstat (limited to 'include')
| -rw-r--r-- | include/framework/DynamicExtension.h | 21 | ||||
| -rw-r--r-- | include/framework/interface/Query.h | 184 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 2 | ||||
| -rw-r--r-- | include/query/irs.h | 41 | ||||
| -rw-r--r-- | include/query/knn.h | 42 | ||||
| -rw-r--r-- | include/query/pointlookup.h | 19 | ||||
| -rw-r--r-- | include/query/rangecount.h | 37 | ||||
| -rw-r--r-- | include/query/rangequery.h | 39 | ||||
| -rw-r--r-- | include/query/wss.h | 28 |
9 files changed, 194 insertions, 219 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 5a95679..719232e 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -202,7 +202,7 @@ public: * @return A future, from which the query results can be retrieved upon * query completion */ - std::future<std::vector<QueryResult>> + std::future<QueryResult> query(Parameters &&parms) { return schedule_query(std::move(parms)); } @@ -628,26 +628,17 @@ private: QueryType::distribute_query(parms, local_queries, buffer_query); /* execute the local/buffer queries and combine the results into output */ - std::vector<QueryResult> output; + QueryResult output; do { - std::vector<std::vector<LocalResult>> - query_results(shards.size() + 1); + std::vector<LocalResult> query_results(shards.size() + 1); for (size_t i = 0; i < query_results.size(); i++) { - std::vector<LocalResult> local_results; - ShardID shid; - if (i == 0) { /* execute buffer query */ - local_results = QueryType::local_query_buffer(buffer_query); - shid = INVALID_SHID; + query_results[i] = QueryType::local_query_buffer(buffer_query); } else { /*execute local queries */ - local_results = QueryType::local_query(shards[i - 1].second, + query_results[i] = QueryType::local_query(shards[i - 1].second, local_queries[i - 1]); - shid = shards[i - 1].first; } - /* framework-level, automatic delete filtering */ - query_results[i] = std::move(local_results); - /* end query early if EARLY_ABORT is set and a result exists */ if constexpr (QueryType::EARLY_ABORT) { if (query_results[i].size() > 0) @@ -695,7 +686,7 @@ private: m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); } - std::future<std::vector<QueryResult>> + std::future<QueryResult> schedule_query(Parameters &&query_parms) { auto args = new QueryArgs<ShardType, QueryType, DynamicExtension>(); diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h index 1b64646..97a973d 100644 --- a/include/framework/interface/Query.h +++ b/include/framework/interface/Query.h @@ -12,18 +12,6 @@ namespace de { -/* - * FIXME: It would probably be best to absorb the std::vector into - * this type too; this would allow user-defined collections for - * intermediate results, which could allow for more merging - * optimizations. However, this would require an alternative - * approach to doing delete checks, so we'll leave it for now. - */ -template <typename R> -concept LocalResultInterface = requires(R res) { - { res.is_deleted() } -> std::convertible_to<bool>; - { res.is_tombstone() } -> std::convertible_to<bool>; -}; /* * @@ -35,102 +23,100 @@ template <typename QUERY, typename SHARD, typename PARAMETERS = typename QUERY::Parameters, typename LOCAL = typename QUERY::LocalQuery, typename LOCAL_BUFFER = typename QUERY::LocalQueryBuffer> -concept QueryInterface = LocalResultInterface<LOCAL_RESULT> && +concept QueryInterface = requires(PARAMETERS *parameters, LOCAL *local, LOCAL_BUFFER *buffer_query, SHARD *shard, std::vector<LOCAL *> &local_queries, - std::vector<std::vector<LOCAL_RESULT>> &local_results, - std::vector<RESULT> &result, + std::vector<LOCAL_RESULT> &local_results, RESULT &result, BufferView<typename SHARD::RECORD> *bv) { + /* + * Given a set of query parameters and a shard, return a local query + * object for that shard. + */ + { + QUERY::local_preproc(shard, parameters) + } -> std::convertible_to<LOCAL *>; - /* - * Given a set of query parameters and a shard, return a local query - * object for that shard. - */ - { QUERY::local_preproc(shard, parameters) } -> std::convertible_to<LOCAL *>; - - /* - * Given a set of query parameters and a buffer view, return a local - * query object for the buffer. - * NOTE: for interface reasons, the pointer to the buffer view MUST be - * stored inside of the local query object. The future buffer - * query routine will access the buffer by way of this pointer. - */ - { - QUERY::local_preproc_buffer(bv, parameters) - } -> std::convertible_to<LOCAL_BUFFER *>; + /* + * Given a set of query parameters and a buffer view, return a local + * query object for the buffer. + * NOTE: for interface reasons, the pointer to the buffer view MUST be + * stored inside of the local query object. The future buffer + * query routine will access the buffer by way of this pointer. + */ + { + QUERY::local_preproc_buffer(bv, parameters) + } -> std::convertible_to<LOCAL_BUFFER *>; - /* - * Given a full set of local queries, and the buffer query, make any - * necessary adjustments to the local queries in-place, to account for - * global information. If no additional processing is required, this - * function can be left empty. - */ - {QUERY::distribute_query(parameters, local_queries, buffer_query)}; + /* + * Given a full set of local queries, and the buffer query, make any + * necessary adjustments to the local queries in-place, to account for + * global information. If no additional processing is required, this + * function can be left empty. + */ + { QUERY::distribute_query(parameters, local_queries, buffer_query) }; - /* - * Answer the local query, defined by `local` against `shard` and return - * a vector of LOCAL_RESULT objects defining the query result. - */ - { - QUERY::local_query(shard, local) - } -> std::convertible_to<std::vector<LOCAL_RESULT>>; + /* + * Answer the local query, defined by `local` against `shard` and return + * a vector of LOCAL_RESULT objects defining the query result. + */ + { QUERY::local_query(shard, local) } -> std::convertible_to<LOCAL_RESULT>; - /* - * Answer the local query defined by `local` against the buffer (which - * should be accessed by a pointer inside of `local`) and return a vector - * of LOCAL_RESULT objects defining the query result. - */ - { - QUERY::local_query_buffer(buffer_query) - } -> std::convertible_to<std::vector<LOCAL_RESULT>>; + /* + * Answer the local query defined by `local` against the buffer (which + * should be accessed by a pointer inside of `local`) and return a vector + * of LOCAL_RESULT objects defining the query result. + */ + { + QUERY::local_query_buffer(buffer_query) + } -> std::convertible_to<LOCAL_RESULT>; - /* - * Process the local results from the buffer and all of the shards, - * stored in `local_results`, and insert the associated ResultType - * objects into the `result` vector, which represents the final result - * of the query. Updates to this vector are done in-place. - */ - {QUERY::combine(local_results, parameters, result)}; + /* + * Process the local results from the buffer and all of the shards, + * stored in `local_results`, and insert the associated ResultType + * objects into the `result` vector, which represents the final result + * of the query. Updates to this vector are done in-place. + */ + { QUERY::combine(local_results, parameters, result) }; - /* - * Process the post-combine `result` vector of ResultType objects, - * in the context of the global and local query parameters, to determine - * if the query should be repeated. If so, make any necessary adjustments - * to the local query objects and return True. Otherwise, return False. - * - * If no repetition is needed for a given problem type, simply return - * False immediately and the query will end. - */ - { - QUERY::repeat(parameters, result, local_queries, buffer_query) - } -> std::same_as<bool>; + /* + * Process the post-combine `result` vector of ResultType objects, + * in the context of the global and local query parameters, to determine + * if the query should be repeated. If so, make any necessary adjustments + * to the local query objects and return True. Otherwise, return False. + * + * If no repetition is needed for a given problem type, simply return + * False immediately and the query will end. + */ + { + QUERY::repeat(parameters, result, local_queries, buffer_query) + } -> std::same_as<bool>; - /* - * If this flag is True, then the query will immediately stop and return - * a result as soon as the first non-deleted LocalRecordType is found. - * Otherwise, every Shard and the buffer will be queried and the results - * merged, like normal. - * - * This is largely an optimization flag for use with point-lookup, or - * other single-record result queries - */ - { QUERY::EARLY_ABORT } -> std::convertible_to<bool>; + /* + * If this flag is True, then the query will immediately stop and return + * a result as soon as the first non-deleted LocalRecordType is found. + * Otherwise, every Shard and the buffer will be queried and the results + * merged, like normal. + * + * This is largely an optimization flag for use with point-lookup, or + * other single-record result queries + */ + { QUERY::EARLY_ABORT } -> std::convertible_to<bool>; - /* - * If false, the built-in delete filtering that the framework can - * apply to the local results, prior to calling combine, will be skipped. - * This general filtering can be inefficient, particularly for tombstone - * -based deletes, and so if a more efficient manual filtering can be - * performed, it is worth setting this to True and doing that filtering - * in the combine step. - * - * If deletes are not a consideration for your problem, it's also best - * to turn this off, as it'll avoid the framework making an extra pass - * over the local results prior to combining them. - * - * TODO: Temporarily disabling this, as we've dropped framework-level - * delete filtering for the time being. - */ - /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to<bool>; */ -}; + /* + * If false, the built-in delete filtering that the framework can + * apply to the local results, prior to calling combine, will be skipped. + * This general filtering can be inefficient, particularly for tombstone + * -based deletes, and so if a more efficient manual filtering can be + * performed, it is worth setting this to True and doing that filtering + * in the combine step. + * + * If deletes are not a consideration for your problem, it's also best + * to turn this off, as it'll avoid the framework making an extra pass + * over the local results prior to combining them. + * + * TODO: Temporarily disabling this, as we've dropped framework-level + * delete filtering for the time being. + */ + /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to<bool>; */ + }; } // namespace de diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 6b6f040..34f053a 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -35,7 +35,7 @@ struct ReconstructionArgs { }; template <ShardInterface S, QueryInterface<S> Q, typename DE> struct QueryArgs { - std::promise<std::vector<typename Q::ResultType>> result_set; + std::promise<typename Q::ResultType> result_set; typename Q::Parameters query_parms; DE *extension; }; diff --git a/include/query/irs.h b/include/query/irs.h index 6dec850..ec6fa29 100644 --- a/include/query/irs.h +++ b/include/query/irs.h @@ -40,15 +40,15 @@ public: BufferView<R> *buffer; size_t cutoff; - std::vector<Wrapped<R>> records; + std::vector<R> records; std::unique_ptr<psudb::Alias> alias; size_t sample_size; Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<R> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = false; @@ -86,9 +86,11 @@ public: } for (size_t i = 0; i < query->cutoff; i++) { - if ((query->buffer->get(i)->rec.key >= query->global_parms.lower_bound) && - (buffer->get(i)->rec.key <= query->global_parms.upper_bound)) { - query->records.emplace_back(*(query->buffer->get(i))); + auto rec = buffer->get(i); + if (rec->rec.key >= query->global_parms.lower_bound && + rec->rec.key <= query->global_parms.upper_bound && + (!rec->is_deleted() && !rec->is_tombstone())) { + query->records.emplace_back(query->buffer->get(i)->rec); } } @@ -163,10 +165,10 @@ public: } } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { + static LocalResultType local_query(S *shard, LocalQuery *query) { auto sample_sz = query->sample_size; - std::vector<LocalResultType> result_set; + LocalResultType result_set; if (sample_sz == 0 || query->lower_idx == shard->get_record_count()) { return result_set; @@ -180,15 +182,19 @@ public: (range_length > 0) ? gsl_rng_uniform_int(query->global_parms.rng, range_length) : 0; - result_set.emplace_back(*shard->get_record_at(query->lower_idx + idx)); + auto wrec = shard->get_record_at(query->lower_idx + idx); + + if (!wrec->is_deleted() && !wrec->is_tombstone()) { + result_set.emplace_back(wrec->rec); + } } while (attempts < sample_sz); return result_set; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; result.reserve(query->sample_size); if constexpr (REJECTION) { @@ -197,8 +203,9 @@ public: auto rec = query->buffer->get(idx); if (rec->rec.key >= query->global_parms.lower_bound && - rec->rec.key <= query->global_parms.upper_bound) { - result.emplace_back(*rec); + rec->rec.key <= query->global_parms.upper_bound && + (!rec->is_deleted() && !rec->is_tombstone())) { + result.emplace_back(rec->rec); } } @@ -215,16 +222,16 @@ public: } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { - output.emplace_back(local_results[i][j].rec); + output.emplace_back(local_results[i][j]); } } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { if (output.size() < parms->sample_size) { diff --git a/include/query/knn.h b/include/query/knn.h index 87ea10a..91a032c 100644 --- a/include/query/knn.h +++ b/include/query/knn.h @@ -39,8 +39,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<const Wrapped<R> *> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = true; @@ -66,8 +66,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> results; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType results; Wrapped<R> wrec; wrec.rec = query->global_parms.point; @@ -79,17 +79,16 @@ public: shard->search(query->global_parms.point, query->global_parms.k, pq); while (pq.size() > 0) { - results.emplace_back(*pq.peek().data); + results.emplace_back(pq.peek().data); pq.pop(); } return results; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> results; + LocalResultType results; Wrapped<R> wrec; wrec.rec = query->global_parms.point; @@ -118,41 +117,44 @@ public: } while (pq.size() > 0) { - results.emplace_back(*(pq.peek().data)); + results.emplace_back(pq.peek().data); pq.pop(); } - return std::move(results); + return results; } - static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + static void combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { + + Wrapped<R> wrec; + wrec.rec = parms->point; + wrec.header = 0; + PriorityQueue<Wrapped<R>, DistCmpMax<Wrapped<R>>> pq(parms->k, &wrec); - PriorityQueue<R, DistCmpMax<R>> pq(parms->k, &(parms->point)); for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { if (pq.size() < parms->k) { - pq.push(&local_results[i][j].rec); + pq.push(local_results[i][j]); } else { - double head_dist = pq.peek().data->calc_distance(parms->point); - double cur_dist = local_results[i][j].rec.calc_distance(parms->point); + double head_dist = pq.peek().data->rec.calc_distance(wrec.rec); + double cur_dist = local_results[i][j]->rec.calc_distance(wrec.rec); if (cur_dist < head_dist) { pq.pop(); - pq.push(&local_results[i][j].rec); + pq.push(local_results[i][j]); } } } } while (pq.size() > 0) { - output.emplace_back(*pq.peek().data); + output.emplace_back(pq.peek().data->rec); pq.pop(); } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h index f3788de..65cffa7 100644 --- a/include/query/pointlookup.h +++ b/include/query/pointlookup.h @@ -38,8 +38,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<Wrapped<R>> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = true; constexpr static bool SKIP_DELETE_FILTER = true; @@ -65,8 +65,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; auto r = shard->point_lookup({query->global_parms.search_key, 0}); @@ -77,9 +77,8 @@ public: return result; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { + LocalResultType result; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); @@ -95,8 +94,8 @@ public: static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (auto r : local_results) { if (r.size() > 0) { if (r[0].is_deleted() || r[0].is_tombstone()) { @@ -109,7 +108,7 @@ public: } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 68d304d..0898473 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -16,7 +16,7 @@ namespace de { namespace rc { -template <ShardInterface S, bool FORCE_SCAN = true> class Query { +template <ShardInterface S> class Query { typedef typename S::RECORD R; public: @@ -75,8 +75,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result = {0, 0}; /* * if the returned index is one past the end of the @@ -88,8 +88,6 @@ public: } auto ptr = shard->get_record_at(query->start_idx); - size_t reccnt = 0; - size_t tscnt = 0; /* * roll the pointer forward to the first record that is @@ -104,53 +102,48 @@ public: ptr->rec.key <= query->global_parms.upper_bound) { if (!ptr->is_deleted()) { - reccnt++; + result.record_count++; if (ptr->is_tombstone()) { - tscnt++; + result.tombstone_count++; } } ptr++; } - result.push_back({reccnt, tscnt}); return result; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; - size_t reccnt = 0; - size_t tscnt = 0; + LocalResultType result = {0, 0}; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); if (rec->rec.key >= query->global_parms.lower_bound && rec->rec.key <= query->global_parms.upper_bound) { if (!rec->is_deleted()) { - reccnt++; + result.record_count++; if (rec->is_tombstone()) { - tscnt++; + result.tombstone_count++; } } } } - result.push_back({reccnt, tscnt}); - return result; } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { size_t reccnt = 0; size_t tscnt = 0; for (auto &local_result : local_results) { - reccnt += local_result[0].record_count; - tscnt += local_result[0].tombstone_count; + reccnt += local_result.record_count; + tscnt += local_result.tombstone_count; } /* if more tombstones than results, clamp the output at 0 */ @@ -158,10 +151,10 @@ public: tscnt = reccnt; } - output.push_back({reccnt - tscnt}); + output = reccnt - tscnt; } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/rangequery.h b/include/query/rangequery.h index e7be39c..d9e7db8 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -38,8 +38,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<Wrapped<R>> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = true; @@ -69,8 +69,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; /* * if the returned index is one past the end of the @@ -101,10 +101,9 @@ public: return result; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); if (rec->rec.key >= query->global_parms.lower_bound && @@ -116,26 +115,25 @@ public: return result; } - static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { - std::vector<Cursor<LocalResultType>> cursors; + static void combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { + std::vector<Cursor<Wrapped<R>>> cursors; cursors.reserve(local_results.size()); - psudb::PriorityQueue<LocalResultType> pq(local_results.size()); + psudb::PriorityQueue<Wrapped<R>> pq(local_results.size()); size_t total = 0; size_t tmp_n = local_results.size(); for (size_t i = 0; i < tmp_n; ++i) if (local_results[i].size() > 0) { auto base = local_results[i].data(); - cursors.emplace_back(Cursor<LocalResultType>{ + cursors.emplace_back(Cursor<Wrapped<R>>{ base, base + local_results[i].size(), 0, local_results[i].size()}); assert(i == cursors.size() - 1); total += local_results[i].size(); pq.push(cursors[i].ptr, tmp_n - i - 1); } else { - cursors.emplace_back(Cursor<LocalResultType>{nullptr, nullptr, 0, 0}); + cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); } if (total == 0) { @@ -146,9 +144,8 @@ public: while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 - ? pq.peek(1) - : psudb::queue_record<LocalResultType>{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) + : psudb::queue_record<Wrapped<R>>{nullptr, 0}; if (!now.data->is_tombstone() && next.data != nullptr && now.data->rec == next.data->rec && next.data->is_tombstone()) { @@ -156,9 +153,9 @@ public: pq.pop(); auto &cursor1 = cursors[tmp_n - now.version - 1]; auto &cursor2 = cursors[tmp_n - next.version - 1]; - if (advance_cursor<LocalResultType>(cursor1)) + if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor<LocalResultType>(cursor2)) + if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto &cursor = cursors[tmp_n - now.version - 1]; @@ -167,7 +164,7 @@ public: pq.pop(); - if (advance_cursor<LocalResultType>(cursor)) + if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version); } } @@ -175,7 +172,7 @@ public: return; } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/wss.h b/include/query/wss.h index 54620ca..d4e75f3 100644 --- a/include/query/wss.h +++ b/include/query/wss.h @@ -49,8 +49,8 @@ public: constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = false; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<R> LocalResultType; + typedef std::vector<R> ResultType; static LocalQuery *local_preproc(S *shard, Parameters *parms) { auto query = new LocalQuery(); @@ -130,8 +130,8 @@ public: } } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; if (query->sample_size == 0) { return result; @@ -139,25 +139,25 @@ public: for (size_t i = 0; i < query->sample_size; i++) { size_t idx = shard->get_weighted_sample(query->global_parms.rng); - if (!shard->get_record_at(idx)->is_deleted()) { - result.emplace_back(*shard->get_record_at(idx)); + if (!shard->get_record_at(idx)->is_deleted() && !shard->get_record_at(idx)->is_tombstone()) { + result.emplace_back(shard->get_record_at(idx)->rec); } } return result; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; for (size_t i = 0; i < query->sample_size; i++) { auto idx = gsl_rng_uniform_int(query->global_parms.rng, query->cutoff); auto rec = query->buffer->get(idx); auto test = gsl_rng_uniform(query->global_parms.rng) * query->max_weight; - if (test <= rec->rec.weight && !rec->is_deleted()) { - result.emplace_back(*rec); + if (test <= rec->rec.weight && !rec->is_deleted() && !rec->is_tombstone()) { + result.emplace_back(rec->rec); } } @@ -165,16 +165,16 @@ public: } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { - output.emplace_back(local_results[i][j].rec); + output.emplace_back(local_results[i][j]); } } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { if (output.size() < parms->sample_size) { |