summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/framework/DynamicExtension.h21
-rw-r--r--include/framework/interface/Query.h184
-rw-r--r--include/framework/scheduling/Task.h2
-rw-r--r--include/query/irs.h41
-rw-r--r--include/query/knn.h42
-rw-r--r--include/query/pointlookup.h19
-rw-r--r--include/query/rangecount.h37
-rw-r--r--include/query/rangequery.h39
-rw-r--r--include/query/wss.h28
9 files changed, 194 insertions, 219 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 5a95679..719232e 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -202,7 +202,7 @@ public:
* @return A future, from which the query results can be retrieved upon
* query completion
*/
- std::future<std::vector<QueryResult>>
+ std::future<QueryResult>
query(Parameters &&parms) {
return schedule_query(std::move(parms));
}
@@ -628,26 +628,17 @@ private:
QueryType::distribute_query(parms, local_queries, buffer_query);
/* execute the local/buffer queries and combine the results into output */
- std::vector<QueryResult> output;
+ QueryResult output;
do {
- std::vector<std::vector<LocalResult>>
- query_results(shards.size() + 1);
+ std::vector<LocalResult> query_results(shards.size() + 1);
for (size_t i = 0; i < query_results.size(); i++) {
- std::vector<LocalResult> local_results;
- ShardID shid;
-
if (i == 0) { /* execute buffer query */
- local_results = QueryType::local_query_buffer(buffer_query);
- shid = INVALID_SHID;
+ query_results[i] = QueryType::local_query_buffer(buffer_query);
} else { /*execute local queries */
- local_results = QueryType::local_query(shards[i - 1].second,
+ query_results[i] = QueryType::local_query(shards[i - 1].second,
local_queries[i - 1]);
- shid = shards[i - 1].first;
}
- /* framework-level, automatic delete filtering */
- query_results[i] = std::move(local_results);
-
/* end query early if EARLY_ABORT is set and a result exists */
if constexpr (QueryType::EARLY_ABORT) {
if (query_results[i].size() > 0)
@@ -695,7 +686,7 @@ private:
m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
}
- std::future<std::vector<QueryResult>>
+ std::future<QueryResult>
schedule_query(Parameters &&query_parms) {
auto args =
new QueryArgs<ShardType, QueryType, DynamicExtension>();
diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h
index 1b64646..97a973d 100644
--- a/include/framework/interface/Query.h
+++ b/include/framework/interface/Query.h
@@ -12,18 +12,6 @@
namespace de {
-/*
- * FIXME: It would probably be best to absorb the std::vector into
- * this type too; this would allow user-defined collections for
- * intermediate results, which could allow for more merging
- * optimizations. However, this would require an alternative
- * approach to doing delete checks, so we'll leave it for now.
- */
-template <typename R>
-concept LocalResultInterface = requires(R res) {
- { res.is_deleted() } -> std::convertible_to<bool>;
- { res.is_tombstone() } -> std::convertible_to<bool>;
-};
/*
*
@@ -35,102 +23,100 @@ template <typename QUERY, typename SHARD,
typename PARAMETERS = typename QUERY::Parameters,
typename LOCAL = typename QUERY::LocalQuery,
typename LOCAL_BUFFER = typename QUERY::LocalQueryBuffer>
-concept QueryInterface = LocalResultInterface<LOCAL_RESULT> &&
+concept QueryInterface =
requires(PARAMETERS *parameters, LOCAL *local, LOCAL_BUFFER *buffer_query,
SHARD *shard, std::vector<LOCAL *> &local_queries,
- std::vector<std::vector<LOCAL_RESULT>> &local_results,
- std::vector<RESULT> &result,
+ std::vector<LOCAL_RESULT> &local_results, RESULT &result,
BufferView<typename SHARD::RECORD> *bv) {
+ /*
+ * Given a set of query parameters and a shard, return a local query
+ * object for that shard.
+ */
+ {
+ QUERY::local_preproc(shard, parameters)
+ } -> std::convertible_to<LOCAL *>;
- /*
- * Given a set of query parameters and a shard, return a local query
- * object for that shard.
- */
- { QUERY::local_preproc(shard, parameters) } -> std::convertible_to<LOCAL *>;
-
- /*
- * Given a set of query parameters and a buffer view, return a local
- * query object for the buffer.
- * NOTE: for interface reasons, the pointer to the buffer view MUST be
- * stored inside of the local query object. The future buffer
- * query routine will access the buffer by way of this pointer.
- */
- {
- QUERY::local_preproc_buffer(bv, parameters)
- } -> std::convertible_to<LOCAL_BUFFER *>;
+ /*
+ * Given a set of query parameters and a buffer view, return a local
+ * query object for the buffer.
+ * NOTE: for interface reasons, the pointer to the buffer view MUST be
+ * stored inside of the local query object. The future buffer
+ * query routine will access the buffer by way of this pointer.
+ */
+ {
+ QUERY::local_preproc_buffer(bv, parameters)
+ } -> std::convertible_to<LOCAL_BUFFER *>;
- /*
- * Given a full set of local queries, and the buffer query, make any
- * necessary adjustments to the local queries in-place, to account for
- * global information. If no additional processing is required, this
- * function can be left empty.
- */
- {QUERY::distribute_query(parameters, local_queries, buffer_query)};
+ /*
+ * Given a full set of local queries, and the buffer query, make any
+ * necessary adjustments to the local queries in-place, to account for
+ * global information. If no additional processing is required, this
+ * function can be left empty.
+ */
+ { QUERY::distribute_query(parameters, local_queries, buffer_query) };
- /*
- * Answer the local query, defined by `local` against `shard` and return
- * a vector of LOCAL_RESULT objects defining the query result.
- */
- {
- QUERY::local_query(shard, local)
- } -> std::convertible_to<std::vector<LOCAL_RESULT>>;
+ /*
+ * Answer the local query, defined by `local` against `shard` and return
+ * a vector of LOCAL_RESULT objects defining the query result.
+ */
+ { QUERY::local_query(shard, local) } -> std::convertible_to<LOCAL_RESULT>;
- /*
- * Answer the local query defined by `local` against the buffer (which
- * should be accessed by a pointer inside of `local`) and return a vector
- * of LOCAL_RESULT objects defining the query result.
- */
- {
- QUERY::local_query_buffer(buffer_query)
- } -> std::convertible_to<std::vector<LOCAL_RESULT>>;
+ /*
+ * Answer the local query defined by `local` against the buffer (which
+ * should be accessed by a pointer inside of `local`) and return a vector
+ * of LOCAL_RESULT objects defining the query result.
+ */
+ {
+ QUERY::local_query_buffer(buffer_query)
+ } -> std::convertible_to<LOCAL_RESULT>;
- /*
- * Process the local results from the buffer and all of the shards,
- * stored in `local_results`, and insert the associated ResultType
- * objects into the `result` vector, which represents the final result
- * of the query. Updates to this vector are done in-place.
- */
- {QUERY::combine(local_results, parameters, result)};
+ /*
+ * Process the local results from the buffer and all of the shards,
+ * stored in `local_results`, and insert the associated ResultType
+ * objects into the `result` vector, which represents the final result
+ * of the query. Updates to this vector are done in-place.
+ */
+ { QUERY::combine(local_results, parameters, result) };
- /*
- * Process the post-combine `result` vector of ResultType objects,
- * in the context of the global and local query parameters, to determine
- * if the query should be repeated. If so, make any necessary adjustments
- * to the local query objects and return True. Otherwise, return False.
- *
- * If no repetition is needed for a given problem type, simply return
- * False immediately and the query will end.
- */
- {
- QUERY::repeat(parameters, result, local_queries, buffer_query)
- } -> std::same_as<bool>;
+ /*
+ * Process the post-combine `result` vector of ResultType objects,
+ * in the context of the global and local query parameters, to determine
+ * if the query should be repeated. If so, make any necessary adjustments
+ * to the local query objects and return True. Otherwise, return False.
+ *
+ * If no repetition is needed for a given problem type, simply return
+ * False immediately and the query will end.
+ */
+ {
+ QUERY::repeat(parameters, result, local_queries, buffer_query)
+ } -> std::same_as<bool>;
- /*
- * If this flag is True, then the query will immediately stop and return
- * a result as soon as the first non-deleted LocalRecordType is found.
- * Otherwise, every Shard and the buffer will be queried and the results
- * merged, like normal.
- *
- * This is largely an optimization flag for use with point-lookup, or
- * other single-record result queries
- */
- { QUERY::EARLY_ABORT } -> std::convertible_to<bool>;
+ /*
+ * If this flag is True, then the query will immediately stop and return
+ * a result as soon as the first non-deleted LocalRecordType is found.
+ * Otherwise, every Shard and the buffer will be queried and the results
+ * merged, like normal.
+ *
+ * This is largely an optimization flag for use with point-lookup, or
+ * other single-record result queries
+ */
+ { QUERY::EARLY_ABORT } -> std::convertible_to<bool>;
- /*
- * If false, the built-in delete filtering that the framework can
- * apply to the local results, prior to calling combine, will be skipped.
- * This general filtering can be inefficient, particularly for tombstone
- * -based deletes, and so if a more efficient manual filtering can be
- * performed, it is worth setting this to True and doing that filtering
- * in the combine step.
- *
- * If deletes are not a consideration for your problem, it's also best
- * to turn this off, as it'll avoid the framework making an extra pass
- * over the local results prior to combining them.
- *
- * TODO: Temporarily disabling this, as we've dropped framework-level
- * delete filtering for the time being.
- */
- /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to<bool>; */
-};
+ /*
+ * If false, the built-in delete filtering that the framework can
+ * apply to the local results, prior to calling combine, will be skipped.
+ * This general filtering can be inefficient, particularly for tombstone
+ * -based deletes, and so if a more efficient manual filtering can be
+ * performed, it is worth setting this to True and doing that filtering
+ * in the combine step.
+ *
+ * If deletes are not a consideration for your problem, it's also best
+ * to turn this off, as it'll avoid the framework making an extra pass
+ * over the local results prior to combining them.
+ *
+ * TODO: Temporarily disabling this, as we've dropped framework-level
+ * delete filtering for the time being.
+ */
+ /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to<bool>; */
+ };
} // namespace de
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 6b6f040..34f053a 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -35,7 +35,7 @@ struct ReconstructionArgs {
};
template <ShardInterface S, QueryInterface<S> Q, typename DE> struct QueryArgs {
- std::promise<std::vector<typename Q::ResultType>> result_set;
+ std::promise<typename Q::ResultType> result_set;
typename Q::Parameters query_parms;
DE *extension;
};
diff --git a/include/query/irs.h b/include/query/irs.h
index 6dec850..ec6fa29 100644
--- a/include/query/irs.h
+++ b/include/query/irs.h
@@ -40,15 +40,15 @@ public:
BufferView<R> *buffer;
size_t cutoff;
- std::vector<Wrapped<R>> records;
+ std::vector<R> records;
std::unique_ptr<psudb::Alias> alias;
size_t sample_size;
Parameters global_parms;
};
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<R> LocalResultType;
+ typedef std::vector<R> ResultType;
constexpr static bool EARLY_ABORT = false;
constexpr static bool SKIP_DELETE_FILTER = false;
@@ -86,9 +86,11 @@ public:
}
for (size_t i = 0; i < query->cutoff; i++) {
- if ((query->buffer->get(i)->rec.key >= query->global_parms.lower_bound) &&
- (buffer->get(i)->rec.key <= query->global_parms.upper_bound)) {
- query->records.emplace_back(*(query->buffer->get(i)));
+ auto rec = buffer->get(i);
+ if (rec->rec.key >= query->global_parms.lower_bound &&
+ rec->rec.key <= query->global_parms.upper_bound &&
+ (!rec->is_deleted() && !rec->is_tombstone())) {
+ query->records.emplace_back(query->buffer->get(i)->rec);
}
}
@@ -163,10 +165,10 @@ public:
}
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
auto sample_sz = query->sample_size;
- std::vector<LocalResultType> result_set;
+ LocalResultType result_set;
if (sample_sz == 0 || query->lower_idx == shard->get_record_count()) {
return result_set;
@@ -180,15 +182,19 @@ public:
(range_length > 0)
? gsl_rng_uniform_int(query->global_parms.rng, range_length)
: 0;
- result_set.emplace_back(*shard->get_record_at(query->lower_idx + idx));
+ auto wrec = shard->get_record_at(query->lower_idx + idx);
+
+ if (!wrec->is_deleted() && !wrec->is_tombstone()) {
+ result_set.emplace_back(wrec->rec);
+ }
} while (attempts < sample_sz);
return result_set;
}
- static std::vector<LocalResultType>
+ static LocalResultType
local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
+ LocalResultType result;
result.reserve(query->sample_size);
if constexpr (REJECTION) {
@@ -197,8 +203,9 @@ public:
auto rec = query->buffer->get(idx);
if (rec->rec.key >= query->global_parms.lower_bound &&
- rec->rec.key <= query->global_parms.upper_bound) {
- result.emplace_back(*rec);
+ rec->rec.key <= query->global_parms.upper_bound &&
+ (!rec->is_deleted() && !rec->is_tombstone())) {
+ result.emplace_back(rec->rec);
}
}
@@ -215,16 +222,16 @@ public:
}
static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
for (size_t i = 0; i < local_results.size(); i++) {
for (size_t j = 0; j < local_results[i].size(); j++) {
- output.emplace_back(local_results[i][j].rec);
+ output.emplace_back(local_results[i][j]);
}
}
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
if (output.size() < parms->sample_size) {
diff --git a/include/query/knn.h b/include/query/knn.h
index 87ea10a..91a032c 100644
--- a/include/query/knn.h
+++ b/include/query/knn.h
@@ -39,8 +39,8 @@ public:
Parameters global_parms;
};
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<const Wrapped<R> *> LocalResultType;
+ typedef std::vector<R> ResultType;
constexpr static bool EARLY_ABORT = false;
constexpr static bool SKIP_DELETE_FILTER = true;
@@ -66,8 +66,8 @@ public:
return;
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> results;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType results;
Wrapped<R> wrec;
wrec.rec = query->global_parms.point;
@@ -79,17 +79,16 @@ public:
shard->search(query->global_parms.point, query->global_parms.k, pq);
while (pq.size() > 0) {
- results.emplace_back(*pq.peek().data);
+ results.emplace_back(pq.peek().data);
pq.pop();
}
return results;
}
- static std::vector<LocalResultType>
- local_query_buffer(LocalQueryBuffer *query) {
+ static LocalResultType local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> results;
+ LocalResultType results;
Wrapped<R> wrec;
wrec.rec = query->global_parms.point;
@@ -118,41 +117,44 @@ public:
}
while (pq.size() > 0) {
- results.emplace_back(*(pq.peek().data));
+ results.emplace_back(pq.peek().data);
pq.pop();
}
- return std::move(results);
+ return results;
}
- static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ static void combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
+
+ Wrapped<R> wrec;
+ wrec.rec = parms->point;
+ wrec.header = 0;
+ PriorityQueue<Wrapped<R>, DistCmpMax<Wrapped<R>>> pq(parms->k, &wrec);
- PriorityQueue<R, DistCmpMax<R>> pq(parms->k, &(parms->point));
for (size_t i = 0; i < local_results.size(); i++) {
for (size_t j = 0; j < local_results[i].size(); j++) {
if (pq.size() < parms->k) {
- pq.push(&local_results[i][j].rec);
+ pq.push(local_results[i][j]);
} else {
- double head_dist = pq.peek().data->calc_distance(parms->point);
- double cur_dist = local_results[i][j].rec.calc_distance(parms->point);
+ double head_dist = pq.peek().data->rec.calc_distance(wrec.rec);
+ double cur_dist = local_results[i][j]->rec.calc_distance(wrec.rec);
if (cur_dist < head_dist) {
pq.pop();
- pq.push(&local_results[i][j].rec);
+ pq.push(local_results[i][j]);
}
}
}
}
while (pq.size() > 0) {
- output.emplace_back(*pq.peek().data);
+ output.emplace_back(pq.peek().data->rec);
pq.pop();
}
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
return false;
diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h
index f3788de..65cffa7 100644
--- a/include/query/pointlookup.h
+++ b/include/query/pointlookup.h
@@ -38,8 +38,8 @@ public:
Parameters global_parms;
};
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<Wrapped<R>> LocalResultType;
+ typedef std::vector<R> ResultType;
constexpr static bool EARLY_ABORT = true;
constexpr static bool SKIP_DELETE_FILTER = true;
@@ -65,8 +65,8 @@ public:
return;
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType result;
auto r = shard->point_lookup({query->global_parms.search_key, 0});
@@ -77,9 +77,8 @@ public:
return result;
}
- static std::vector<LocalResultType>
- local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query_buffer(LocalQueryBuffer *query) {
+ LocalResultType result;
for (size_t i = 0; i < query->buffer->get_record_count(); i++) {
auto rec = query->buffer->get(i);
@@ -95,8 +94,8 @@ public:
static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
for (auto r : local_results) {
if (r.size() > 0) {
if (r[0].is_deleted() || r[0].is_tombstone()) {
@@ -109,7 +108,7 @@ public:
}
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
return false;
diff --git a/include/query/rangecount.h b/include/query/rangecount.h
index 68d304d..0898473 100644
--- a/include/query/rangecount.h
+++ b/include/query/rangecount.h
@@ -16,7 +16,7 @@
namespace de {
namespace rc {
-template <ShardInterface S, bool FORCE_SCAN = true> class Query {
+template <ShardInterface S> class Query {
typedef typename S::RECORD R;
public:
@@ -75,8 +75,8 @@ public:
return;
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType result = {0, 0};
/*
* if the returned index is one past the end of the
@@ -88,8 +88,6 @@ public:
}
auto ptr = shard->get_record_at(query->start_idx);
- size_t reccnt = 0;
- size_t tscnt = 0;
/*
* roll the pointer forward to the first record that is
@@ -104,53 +102,48 @@ public:
ptr->rec.key <= query->global_parms.upper_bound) {
if (!ptr->is_deleted()) {
- reccnt++;
+ result.record_count++;
if (ptr->is_tombstone()) {
- tscnt++;
+ result.tombstone_count++;
}
}
ptr++;
}
- result.push_back({reccnt, tscnt});
return result;
}
- static std::vector<LocalResultType>
+ static LocalResultType
local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
- size_t reccnt = 0;
- size_t tscnt = 0;
+ LocalResultType result = {0, 0};
for (size_t i = 0; i < query->buffer->get_record_count(); i++) {
auto rec = query->buffer->get(i);
if (rec->rec.key >= query->global_parms.lower_bound &&
rec->rec.key <= query->global_parms.upper_bound) {
if (!rec->is_deleted()) {
- reccnt++;
+ result.record_count++;
if (rec->is_tombstone()) {
- tscnt++;
+ result.tombstone_count++;
}
}
}
}
- result.push_back({reccnt, tscnt});
-
return result;
}
static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
size_t reccnt = 0;
size_t tscnt = 0;
for (auto &local_result : local_results) {
- reccnt += local_result[0].record_count;
- tscnt += local_result[0].tombstone_count;
+ reccnt += local_result.record_count;
+ tscnt += local_result.tombstone_count;
}
/* if more tombstones than results, clamp the output at 0 */
@@ -158,10 +151,10 @@ public:
tscnt = reccnt;
}
- output.push_back({reccnt - tscnt});
+ output = reccnt - tscnt;
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
return false;
diff --git a/include/query/rangequery.h b/include/query/rangequery.h
index e7be39c..d9e7db8 100644
--- a/include/query/rangequery.h
+++ b/include/query/rangequery.h
@@ -38,8 +38,8 @@ public:
Parameters global_parms;
};
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<Wrapped<R>> LocalResultType;
+ typedef std::vector<R> ResultType;
constexpr static bool EARLY_ABORT = false;
constexpr static bool SKIP_DELETE_FILTER = true;
@@ -69,8 +69,8 @@ public:
return;
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType result;
/*
* if the returned index is one past the end of the
@@ -101,10 +101,9 @@ public:
return result;
}
- static std::vector<LocalResultType>
- local_query_buffer(LocalQueryBuffer *query) {
+ static LocalResultType local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
+ LocalResultType result;
for (size_t i = 0; i < query->buffer->get_record_count(); i++) {
auto rec = query->buffer->get(i);
if (rec->rec.key >= query->global_parms.lower_bound &&
@@ -116,26 +115,25 @@ public:
return result;
}
- static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
- std::vector<Cursor<LocalResultType>> cursors;
+ static void combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
+ std::vector<Cursor<Wrapped<R>>> cursors;
cursors.reserve(local_results.size());
- psudb::PriorityQueue<LocalResultType> pq(local_results.size());
+ psudb::PriorityQueue<Wrapped<R>> pq(local_results.size());
size_t total = 0;
size_t tmp_n = local_results.size();
for (size_t i = 0; i < tmp_n; ++i)
if (local_results[i].size() > 0) {
auto base = local_results[i].data();
- cursors.emplace_back(Cursor<LocalResultType>{
+ cursors.emplace_back(Cursor<Wrapped<R>>{
base, base + local_results[i].size(), 0, local_results[i].size()});
assert(i == cursors.size() - 1);
total += local_results[i].size();
pq.push(cursors[i].ptr, tmp_n - i - 1);
} else {
- cursors.emplace_back(Cursor<LocalResultType>{nullptr, nullptr, 0, 0});
+ cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
}
if (total == 0) {
@@ -146,9 +144,8 @@ public:
while (pq.size()) {
auto now = pq.peek();
- auto next = pq.size() > 1
- ? pq.peek(1)
- : psudb::queue_record<LocalResultType>{nullptr, 0};
+ auto next = pq.size() > 1 ? pq.peek(1)
+ : psudb::queue_record<Wrapped<R>>{nullptr, 0};
if (!now.data->is_tombstone() && next.data != nullptr &&
now.data->rec == next.data->rec && next.data->is_tombstone()) {
@@ -156,9 +153,9 @@ public:
pq.pop();
auto &cursor1 = cursors[tmp_n - now.version - 1];
auto &cursor2 = cursors[tmp_n - next.version - 1];
- if (advance_cursor<LocalResultType>(cursor1))
+ if (advance_cursor<Wrapped<R>>(cursor1))
pq.push(cursor1.ptr, now.version);
- if (advance_cursor<LocalResultType>(cursor2))
+ if (advance_cursor<Wrapped<R>>(cursor2))
pq.push(cursor2.ptr, next.version);
} else {
auto &cursor = cursors[tmp_n - now.version - 1];
@@ -167,7 +164,7 @@ public:
pq.pop();
- if (advance_cursor<LocalResultType>(cursor))
+ if (advance_cursor<Wrapped<R>>(cursor))
pq.push(cursor.ptr, now.version);
}
}
@@ -175,7 +172,7 @@ public:
return;
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
return false;
diff --git a/include/query/wss.h b/include/query/wss.h
index 54620ca..d4e75f3 100644
--- a/include/query/wss.h
+++ b/include/query/wss.h
@@ -49,8 +49,8 @@ public:
constexpr static bool EARLY_ABORT = false;
constexpr static bool SKIP_DELETE_FILTER = false;
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<R> LocalResultType;
+ typedef std::vector<R> ResultType;
static LocalQuery *local_preproc(S *shard, Parameters *parms) {
auto query = new LocalQuery();
@@ -130,8 +130,8 @@ public:
}
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType result;
if (query->sample_size == 0) {
return result;
@@ -139,25 +139,25 @@ public:
for (size_t i = 0; i < query->sample_size; i++) {
size_t idx = shard->get_weighted_sample(query->global_parms.rng);
- if (!shard->get_record_at(idx)->is_deleted()) {
- result.emplace_back(*shard->get_record_at(idx));
+ if (!shard->get_record_at(idx)->is_deleted() && !shard->get_record_at(idx)->is_tombstone()) {
+ result.emplace_back(shard->get_record_at(idx)->rec);
}
}
return result;
}
- static std::vector<LocalResultType>
+ static LocalResultType
local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
+ LocalResultType result;
for (size_t i = 0; i < query->sample_size; i++) {
auto idx = gsl_rng_uniform_int(query->global_parms.rng, query->cutoff);
auto rec = query->buffer->get(idx);
auto test = gsl_rng_uniform(query->global_parms.rng) * query->max_weight;
- if (test <= rec->rec.weight && !rec->is_deleted()) {
- result.emplace_back(*rec);
+ if (test <= rec->rec.weight && !rec->is_deleted() && !rec->is_tombstone()) {
+ result.emplace_back(rec->rec);
}
}
@@ -165,16 +165,16 @@ public:
}
static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
for (size_t i = 0; i < local_results.size(); i++) {
for (size_t j = 0; j < local_results[i].size(); j++) {
- output.emplace_back(local_results[i][j].rec);
+ output.emplace_back(local_results[i][j]);
}
}
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
if (output.size() < parms->sample_size) {