diff options
| author | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-12-06 16:54:05 -0500 |
|---|---|---|
| committer | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-12-06 16:54:05 -0500 |
| commit | 9876d74e503df64eb9e82e540ca41fcf593ebf64 (patch) | |
| tree | 3512690afa8c04f702bd8de500bf8f41b05571c4 /include | |
| parent | e2b81a2d311470d503edae93e68e82791f6bb17c (diff) | |
| download | dynamic-extension-9876d74e503df64eb9e82e540ca41fcf593ebf64.tar.gz | |
Now, the vector<> is part of the user-defined type, not required by the
framework. This should allow for more flexibility in either using
alternative containers, or for more sensible implementations of queries
with single value results (like range count).
Diffstat (limited to 'include')
| -rw-r--r-- | include/framework/DynamicExtension.h | 21 | ||||
| -rw-r--r-- | include/framework/interface/Query.h | 184 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 2 | ||||
| -rw-r--r-- | include/query/irs.h | 41 | ||||
| -rw-r--r-- | include/query/knn.h | 42 | ||||
| -rw-r--r-- | include/query/pointlookup.h | 19 | ||||
| -rw-r--r-- | include/query/rangecount.h | 37 | ||||
| -rw-r--r-- | include/query/rangequery.h | 39 | ||||
| -rw-r--r-- | include/query/wss.h | 28 |
9 files changed, 194 insertions, 219 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 5a95679..719232e 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -202,7 +202,7 @@ public: * @return A future, from which the query results can be retrieved upon * query completion */ - std::future<std::vector<QueryResult>> + std::future<QueryResult> query(Parameters &&parms) { return schedule_query(std::move(parms)); } @@ -628,26 +628,17 @@ private: QueryType::distribute_query(parms, local_queries, buffer_query); /* execute the local/buffer queries and combine the results into output */ - std::vector<QueryResult> output; + QueryResult output; do { - std::vector<std::vector<LocalResult>> - query_results(shards.size() + 1); + std::vector<LocalResult> query_results(shards.size() + 1); for (size_t i = 0; i < query_results.size(); i++) { - std::vector<LocalResult> local_results; - ShardID shid; - if (i == 0) { /* execute buffer query */ - local_results = QueryType::local_query_buffer(buffer_query); - shid = INVALID_SHID; + query_results[i] = QueryType::local_query_buffer(buffer_query); } else { /*execute local queries */ - local_results = QueryType::local_query(shards[i - 1].second, + query_results[i] = QueryType::local_query(shards[i - 1].second, local_queries[i - 1]); - shid = shards[i - 1].first; } - /* framework-level, automatic delete filtering */ - query_results[i] = std::move(local_results); - /* end query early if EARLY_ABORT is set and a result exists */ if constexpr (QueryType::EARLY_ABORT) { if (query_results[i].size() > 0) @@ -695,7 +686,7 @@ private: m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); } - std::future<std::vector<QueryResult>> + std::future<QueryResult> schedule_query(Parameters &&query_parms) { auto args = new QueryArgs<ShardType, QueryType, DynamicExtension>(); diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h index 1b64646..97a973d 100644 --- a/include/framework/interface/Query.h +++ b/include/framework/interface/Query.h @@ -12,18 +12,6 @@ namespace de { -/* - * FIXME: It would probably be best to absorb the std::vector into - * this type too; this would allow user-defined collections for - * intermediate results, which could allow for more merging - * optimizations. However, this would require an alternative - * approach to doing delete checks, so we'll leave it for now. - */ -template <typename R> -concept LocalResultInterface = requires(R res) { - { res.is_deleted() } -> std::convertible_to<bool>; - { res.is_tombstone() } -> std::convertible_to<bool>; -}; /* * @@ -35,102 +23,100 @@ template <typename QUERY, typename SHARD, typename PARAMETERS = typename QUERY::Parameters, typename LOCAL = typename QUERY::LocalQuery, typename LOCAL_BUFFER = typename QUERY::LocalQueryBuffer> -concept QueryInterface = LocalResultInterface<LOCAL_RESULT> && +concept QueryInterface = requires(PARAMETERS *parameters, LOCAL *local, LOCAL_BUFFER *buffer_query, SHARD *shard, std::vector<LOCAL *> &local_queries, - std::vector<std::vector<LOCAL_RESULT>> &local_results, - std::vector<RESULT> &result, + std::vector<LOCAL_RESULT> &local_results, RESULT &result, BufferView<typename SHARD::RECORD> *bv) { + /* + * Given a set of query parameters and a shard, return a local query + * object for that shard. + */ + { + QUERY::local_preproc(shard, parameters) + } -> std::convertible_to<LOCAL *>; - /* - * Given a set of query parameters and a shard, return a local query - * object for that shard. - */ - { QUERY::local_preproc(shard, parameters) } -> std::convertible_to<LOCAL *>; - - /* - * Given a set of query parameters and a buffer view, return a local - * query object for the buffer. - * NOTE: for interface reasons, the pointer to the buffer view MUST be - * stored inside of the local query object. The future buffer - * query routine will access the buffer by way of this pointer. - */ - { - QUERY::local_preproc_buffer(bv, parameters) - } -> std::convertible_to<LOCAL_BUFFER *>; + /* + * Given a set of query parameters and a buffer view, return a local + * query object for the buffer. + * NOTE: for interface reasons, the pointer to the buffer view MUST be + * stored inside of the local query object. The future buffer + * query routine will access the buffer by way of this pointer. + */ + { + QUERY::local_preproc_buffer(bv, parameters) + } -> std::convertible_to<LOCAL_BUFFER *>; - /* - * Given a full set of local queries, and the buffer query, make any - * necessary adjustments to the local queries in-place, to account for - * global information. If no additional processing is required, this - * function can be left empty. - */ - {QUERY::distribute_query(parameters, local_queries, buffer_query)}; + /* + * Given a full set of local queries, and the buffer query, make any + * necessary adjustments to the local queries in-place, to account for + * global information. If no additional processing is required, this + * function can be left empty. + */ + { QUERY::distribute_query(parameters, local_queries, buffer_query) }; - /* - * Answer the local query, defined by `local` against `shard` and return - * a vector of LOCAL_RESULT objects defining the query result. - */ - { - QUERY::local_query(shard, local) - } -> std::convertible_to<std::vector<LOCAL_RESULT>>; + /* + * Answer the local query, defined by `local` against `shard` and return + * a vector of LOCAL_RESULT objects defining the query result. + */ + { QUERY::local_query(shard, local) } -> std::convertible_to<LOCAL_RESULT>; - /* - * Answer the local query defined by `local` against the buffer (which - * should be accessed by a pointer inside of `local`) and return a vector - * of LOCAL_RESULT objects defining the query result. - */ - { - QUERY::local_query_buffer(buffer_query) - } -> std::convertible_to<std::vector<LOCAL_RESULT>>; + /* + * Answer the local query defined by `local` against the buffer (which + * should be accessed by a pointer inside of `local`) and return a vector + * of LOCAL_RESULT objects defining the query result. + */ + { + QUERY::local_query_buffer(buffer_query) + } -> std::convertible_to<LOCAL_RESULT>; - /* - * Process the local results from the buffer and all of the shards, - * stored in `local_results`, and insert the associated ResultType - * objects into the `result` vector, which represents the final result - * of the query. Updates to this vector are done in-place. - */ - {QUERY::combine(local_results, parameters, result)}; + /* + * Process the local results from the buffer and all of the shards, + * stored in `local_results`, and insert the associated ResultType + * objects into the `result` vector, which represents the final result + * of the query. Updates to this vector are done in-place. + */ + { QUERY::combine(local_results, parameters, result) }; - /* - * Process the post-combine `result` vector of ResultType objects, - * in the context of the global and local query parameters, to determine - * if the query should be repeated. If so, make any necessary adjustments - * to the local query objects and return True. Otherwise, return False. - * - * If no repetition is needed for a given problem type, simply return - * False immediately and the query will end. - */ - { - QUERY::repeat(parameters, result, local_queries, buffer_query) - } -> std::same_as<bool>; + /* + * Process the post-combine `result` vector of ResultType objects, + * in the context of the global and local query parameters, to determine + * if the query should be repeated. If so, make any necessary adjustments + * to the local query objects and return True. Otherwise, return False. + * + * If no repetition is needed for a given problem type, simply return + * False immediately and the query will end. + */ + { + QUERY::repeat(parameters, result, local_queries, buffer_query) + } -> std::same_as<bool>; - /* - * If this flag is True, then the query will immediately stop and return - * a result as soon as the first non-deleted LocalRecordType is found. - * Otherwise, every Shard and the buffer will be queried and the results - * merged, like normal. - * - * This is largely an optimization flag for use with point-lookup, or - * other single-record result queries - */ - { QUERY::EARLY_ABORT } -> std::convertible_to<bool>; + /* + * If this flag is True, then the query will immediately stop and return + * a result as soon as the first non-deleted LocalRecordType is found. + * Otherwise, every Shard and the buffer will be queried and the results + * merged, like normal. + * + * This is largely an optimization flag for use with point-lookup, or + * other single-record result queries + */ + { QUERY::EARLY_ABORT } -> std::convertible_to<bool>; - /* - * If false, the built-in delete filtering that the framework can - * apply to the local results, prior to calling combine, will be skipped. - * This general filtering can be inefficient, particularly for tombstone - * -based deletes, and so if a more efficient manual filtering can be - * performed, it is worth setting this to True and doing that filtering - * in the combine step. - * - * If deletes are not a consideration for your problem, it's also best - * to turn this off, as it'll avoid the framework making an extra pass - * over the local results prior to combining them. - * - * TODO: Temporarily disabling this, as we've dropped framework-level - * delete filtering for the time being. - */ - /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to<bool>; */ -}; + /* + * If false, the built-in delete filtering that the framework can + * apply to the local results, prior to calling combine, will be skipped. + * This general filtering can be inefficient, particularly for tombstone + * -based deletes, and so if a more efficient manual filtering can be + * performed, it is worth setting this to True and doing that filtering + * in the combine step. + * + * If deletes are not a consideration for your problem, it's also best + * to turn this off, as it'll avoid the framework making an extra pass + * over the local results prior to combining them. + * + * TODO: Temporarily disabling this, as we've dropped framework-level + * delete filtering for the time being. + */ + /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to<bool>; */ + }; } // namespace de diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 6b6f040..34f053a 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -35,7 +35,7 @@ struct ReconstructionArgs { }; template <ShardInterface S, QueryInterface<S> Q, typename DE> struct QueryArgs { - std::promise<std::vector<typename Q::ResultType>> result_set; + std::promise<typename Q::ResultType> result_set; typename Q::Parameters query_parms; DE *extension; }; diff --git a/include/query/irs.h b/include/query/irs.h index 6dec850..ec6fa29 100644 --- a/include/query/irs.h +++ b/include/query/irs.h @@ -40,15 +40,15 @@ public: BufferView<R> *buffer; size_t cutoff; - std::vector<Wrapped<R>> records; + std::vector<R> records; std::unique_ptr<psudb::Alias> alias; size_t sample_size; Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<R> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = false; @@ -86,9 +86,11 @@ public: } for (size_t i = 0; i < query->cutoff; i++) { - if ((query->buffer->get(i)->rec.key >= query->global_parms.lower_bound) && - (buffer->get(i)->rec.key <= query->global_parms.upper_bound)) { - query->records.emplace_back(*(query->buffer->get(i))); + auto rec = buffer->get(i); + if (rec->rec.key >= query->global_parms.lower_bound && + rec->rec.key <= query->global_parms.upper_bound && + (!rec->is_deleted() && !rec->is_tombstone())) { + query->records.emplace_back(query->buffer->get(i)->rec); } } @@ -163,10 +165,10 @@ public: } } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { + static LocalResultType local_query(S *shard, LocalQuery *query) { auto sample_sz = query->sample_size; - std::vector<LocalResultType> result_set; + LocalResultType result_set; if (sample_sz == 0 || query->lower_idx == shard->get_record_count()) { return result_set; @@ -180,15 +182,19 @@ public: (range_length > 0) ? gsl_rng_uniform_int(query->global_parms.rng, range_length) : 0; - result_set.emplace_back(*shard->get_record_at(query->lower_idx + idx)); + auto wrec = shard->get_record_at(query->lower_idx + idx); + + if (!wrec->is_deleted() && !wrec->is_tombstone()) { + result_set.emplace_back(wrec->rec); + } } while (attempts < sample_sz); return result_set; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; result.reserve(query->sample_size); if constexpr (REJECTION) { @@ -197,8 +203,9 @@ public: auto rec = query->buffer->get(idx); if (rec->rec.key >= query->global_parms.lower_bound && - rec->rec.key <= query->global_parms.upper_bound) { - result.emplace_back(*rec); + rec->rec.key <= query->global_parms.upper_bound && + (!rec->is_deleted() && !rec->is_tombstone())) { + result.emplace_back(rec->rec); } } @@ -215,16 +222,16 @@ public: } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { - output.emplace_back(local_results[i][j].rec); + output.emplace_back(local_results[i][j]); } } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { if (output.size() < parms->sample_size) { diff --git a/include/query/knn.h b/include/query/knn.h index 87ea10a..91a032c 100644 --- a/include/query/knn.h +++ b/include/query/knn.h @@ -39,8 +39,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<const Wrapped<R> *> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = true; @@ -66,8 +66,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> results; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType results; Wrapped<R> wrec; wrec.rec = query->global_parms.point; @@ -79,17 +79,16 @@ public: shard->search(query->global_parms.point, query->global_parms.k, pq); while (pq.size() > 0) { - results.emplace_back(*pq.peek().data); + results.emplace_back(pq.peek().data); pq.pop(); } return results; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> results; + LocalResultType results; Wrapped<R> wrec; wrec.rec = query->global_parms.point; @@ -118,41 +117,44 @@ public: } while (pq.size() > 0) { - results.emplace_back(*(pq.peek().data)); + results.emplace_back(pq.peek().data); pq.pop(); } - return std::move(results); + return results; } - static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + static void combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { + + Wrapped<R> wrec; + wrec.rec = parms->point; + wrec.header = 0; + PriorityQueue<Wrapped<R>, DistCmpMax<Wrapped<R>>> pq(parms->k, &wrec); - PriorityQueue<R, DistCmpMax<R>> pq(parms->k, &(parms->point)); for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { if (pq.size() < parms->k) { - pq.push(&local_results[i][j].rec); + pq.push(local_results[i][j]); } else { - double head_dist = pq.peek().data->calc_distance(parms->point); - double cur_dist = local_results[i][j].rec.calc_distance(parms->point); + double head_dist = pq.peek().data->rec.calc_distance(wrec.rec); + double cur_dist = local_results[i][j]->rec.calc_distance(wrec.rec); if (cur_dist < head_dist) { pq.pop(); - pq.push(&local_results[i][j].rec); + pq.push(local_results[i][j]); } } } } while (pq.size() > 0) { - output.emplace_back(*pq.peek().data); + output.emplace_back(pq.peek().data->rec); pq.pop(); } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h index f3788de..65cffa7 100644 --- a/include/query/pointlookup.h +++ b/include/query/pointlookup.h @@ -38,8 +38,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<Wrapped<R>> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = true; constexpr static bool SKIP_DELETE_FILTER = true; @@ -65,8 +65,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; auto r = shard->point_lookup({query->global_parms.search_key, 0}); @@ -77,9 +77,8 @@ public: return result; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { + LocalResultType result; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); @@ -95,8 +94,8 @@ public: static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (auto r : local_results) { if (r.size() > 0) { if (r[0].is_deleted() || r[0].is_tombstone()) { @@ -109,7 +108,7 @@ public: } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 68d304d..0898473 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -16,7 +16,7 @@ namespace de { namespace rc { -template <ShardInterface S, bool FORCE_SCAN = true> class Query { +template <ShardInterface S> class Query { typedef typename S::RECORD R; public: @@ -75,8 +75,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result = {0, 0}; /* * if the returned index is one past the end of the @@ -88,8 +88,6 @@ public: } auto ptr = shard->get_record_at(query->start_idx); - size_t reccnt = 0; - size_t tscnt = 0; /* * roll the pointer forward to the first record that is @@ -104,53 +102,48 @@ public: ptr->rec.key <= query->global_parms.upper_bound) { if (!ptr->is_deleted()) { - reccnt++; + result.record_count++; if (ptr->is_tombstone()) { - tscnt++; + result.tombstone_count++; } } ptr++; } - result.push_back({reccnt, tscnt}); return result; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; - size_t reccnt = 0; - size_t tscnt = 0; + LocalResultType result = {0, 0}; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); if (rec->rec.key >= query->global_parms.lower_bound && rec->rec.key <= query->global_parms.upper_bound) { if (!rec->is_deleted()) { - reccnt++; + result.record_count++; if (rec->is_tombstone()) { - tscnt++; + result.tombstone_count++; } } } } - result.push_back({reccnt, tscnt}); - return result; } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { size_t reccnt = 0; size_t tscnt = 0; for (auto &local_result : local_results) { - reccnt += local_result[0].record_count; - tscnt += local_result[0].tombstone_count; + reccnt += local_result.record_count; + tscnt += local_result.tombstone_count; } /* if more tombstones than results, clamp the output at 0 */ @@ -158,10 +151,10 @@ public: tscnt = reccnt; } - output.push_back({reccnt - tscnt}); + output = reccnt - tscnt; } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/rangequery.h b/include/query/rangequery.h index e7be39c..d9e7db8 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -38,8 +38,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<Wrapped<R>> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = true; @@ -69,8 +69,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; /* * if the returned index is one past the end of the @@ -101,10 +101,9 @@ public: return result; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); if (rec->rec.key >= query->global_parms.lower_bound && @@ -116,26 +115,25 @@ public: return result; } - static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { - std::vector<Cursor<LocalResultType>> cursors; + static void combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { + std::vector<Cursor<Wrapped<R>>> cursors; cursors.reserve(local_results.size()); - psudb::PriorityQueue<LocalResultType> pq(local_results.size()); + psudb::PriorityQueue<Wrapped<R>> pq(local_results.size()); size_t total = 0; size_t tmp_n = local_results.size(); for (size_t i = 0; i < tmp_n; ++i) if (local_results[i].size() > 0) { auto base = local_results[i].data(); - cursors.emplace_back(Cursor<LocalResultType>{ + cursors.emplace_back(Cursor<Wrapped<R>>{ base, base + local_results[i].size(), 0, local_results[i].size()}); assert(i == cursors.size() - 1); total += local_results[i].size(); pq.push(cursors[i].ptr, tmp_n - i - 1); } else { - cursors.emplace_back(Cursor<LocalResultType>{nullptr, nullptr, 0, 0}); + cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); } if (total == 0) { @@ -146,9 +144,8 @@ public: while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 - ? pq.peek(1) - : psudb::queue_record<LocalResultType>{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) + : psudb::queue_record<Wrapped<R>>{nullptr, 0}; if (!now.data->is_tombstone() && next.data != nullptr && now.data->rec == next.data->rec && next.data->is_tombstone()) { @@ -156,9 +153,9 @@ public: pq.pop(); auto &cursor1 = cursors[tmp_n - now.version - 1]; auto &cursor2 = cursors[tmp_n - next.version - 1]; - if (advance_cursor<LocalResultType>(cursor1)) + if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor<LocalResultType>(cursor2)) + if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto &cursor = cursors[tmp_n - now.version - 1]; @@ -167,7 +164,7 @@ public: pq.pop(); - if (advance_cursor<LocalResultType>(cursor)) + if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version); } } @@ -175,7 +172,7 @@ public: return; } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/wss.h b/include/query/wss.h index 54620ca..d4e75f3 100644 --- a/include/query/wss.h +++ b/include/query/wss.h @@ -49,8 +49,8 @@ public: constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = false; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<R> LocalResultType; + typedef std::vector<R> ResultType; static LocalQuery *local_preproc(S *shard, Parameters *parms) { auto query = new LocalQuery(); @@ -130,8 +130,8 @@ public: } } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; if (query->sample_size == 0) { return result; @@ -139,25 +139,25 @@ public: for (size_t i = 0; i < query->sample_size; i++) { size_t idx = shard->get_weighted_sample(query->global_parms.rng); - if (!shard->get_record_at(idx)->is_deleted()) { - result.emplace_back(*shard->get_record_at(idx)); + if (!shard->get_record_at(idx)->is_deleted() && !shard->get_record_at(idx)->is_tombstone()) { + result.emplace_back(shard->get_record_at(idx)->rec); } } return result; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; for (size_t i = 0; i < query->sample_size; i++) { auto idx = gsl_rng_uniform_int(query->global_parms.rng, query->cutoff); auto rec = query->buffer->get(idx); auto test = gsl_rng_uniform(query->global_parms.rng) * query->max_weight; - if (test <= rec->rec.weight && !rec->is_deleted()) { - result.emplace_back(*rec); + if (test <= rec->rec.weight && !rec->is_deleted() && !rec->is_tombstone()) { + result.emplace_back(rec->rec); } } @@ -165,16 +165,16 @@ public: } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { - output.emplace_back(local_results[i][j].rec); + output.emplace_back(local_results[i][j]); } } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { if (output.size() < parms->sample_size) { |