summaryrefslogtreecommitdiffstats
path: root/include/query
diff options
context:
space:
mode:
Diffstat (limited to 'include/query')
-rw-r--r--include/query/irs.h41
-rw-r--r--include/query/knn.h42
-rw-r--r--include/query/pointlookup.h19
-rw-r--r--include/query/rangecount.h37
-rw-r--r--include/query/rangequery.h39
-rw-r--r--include/query/wss.h28
6 files changed, 102 insertions, 104 deletions
diff --git a/include/query/irs.h b/include/query/irs.h
index 6dec850..ec6fa29 100644
--- a/include/query/irs.h
+++ b/include/query/irs.h
@@ -40,15 +40,15 @@ public:
BufferView<R> *buffer;
size_t cutoff;
- std::vector<Wrapped<R>> records;
+ std::vector<R> records;
std::unique_ptr<psudb::Alias> alias;
size_t sample_size;
Parameters global_parms;
};
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<R> LocalResultType;
+ typedef std::vector<R> ResultType;
constexpr static bool EARLY_ABORT = false;
constexpr static bool SKIP_DELETE_FILTER = false;
@@ -86,9 +86,11 @@ public:
}
for (size_t i = 0; i < query->cutoff; i++) {
- if ((query->buffer->get(i)->rec.key >= query->global_parms.lower_bound) &&
- (buffer->get(i)->rec.key <= query->global_parms.upper_bound)) {
- query->records.emplace_back(*(query->buffer->get(i)));
+ auto rec = buffer->get(i);
+ if (rec->rec.key >= query->global_parms.lower_bound &&
+ rec->rec.key <= query->global_parms.upper_bound &&
+ (!rec->is_deleted() && !rec->is_tombstone())) {
+ query->records.emplace_back(query->buffer->get(i)->rec);
}
}
@@ -163,10 +165,10 @@ public:
}
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
auto sample_sz = query->sample_size;
- std::vector<LocalResultType> result_set;
+ LocalResultType result_set;
if (sample_sz == 0 || query->lower_idx == shard->get_record_count()) {
return result_set;
@@ -180,15 +182,19 @@ public:
(range_length > 0)
? gsl_rng_uniform_int(query->global_parms.rng, range_length)
: 0;
- result_set.emplace_back(*shard->get_record_at(query->lower_idx + idx));
+ auto wrec = shard->get_record_at(query->lower_idx + idx);
+
+ if (!wrec->is_deleted() && !wrec->is_tombstone()) {
+ result_set.emplace_back(wrec->rec);
+ }
} while (attempts < sample_sz);
return result_set;
}
- static std::vector<LocalResultType>
+ static LocalResultType
local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
+ LocalResultType result;
result.reserve(query->sample_size);
if constexpr (REJECTION) {
@@ -197,8 +203,9 @@ public:
auto rec = query->buffer->get(idx);
if (rec->rec.key >= query->global_parms.lower_bound &&
- rec->rec.key <= query->global_parms.upper_bound) {
- result.emplace_back(*rec);
+ rec->rec.key <= query->global_parms.upper_bound &&
+ (!rec->is_deleted() && !rec->is_tombstone())) {
+ result.emplace_back(rec->rec);
}
}
@@ -215,16 +222,16 @@ public:
}
static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
for (size_t i = 0; i < local_results.size(); i++) {
for (size_t j = 0; j < local_results[i].size(); j++) {
- output.emplace_back(local_results[i][j].rec);
+ output.emplace_back(local_results[i][j]);
}
}
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
if (output.size() < parms->sample_size) {
diff --git a/include/query/knn.h b/include/query/knn.h
index 87ea10a..91a032c 100644
--- a/include/query/knn.h
+++ b/include/query/knn.h
@@ -39,8 +39,8 @@ public:
Parameters global_parms;
};
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<const Wrapped<R> *> LocalResultType;
+ typedef std::vector<R> ResultType;
constexpr static bool EARLY_ABORT = false;
constexpr static bool SKIP_DELETE_FILTER = true;
@@ -66,8 +66,8 @@ public:
return;
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> results;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType results;
Wrapped<R> wrec;
wrec.rec = query->global_parms.point;
@@ -79,17 +79,16 @@ public:
shard->search(query->global_parms.point, query->global_parms.k, pq);
while (pq.size() > 0) {
- results.emplace_back(*pq.peek().data);
+ results.emplace_back(pq.peek().data);
pq.pop();
}
return results;
}
- static std::vector<LocalResultType>
- local_query_buffer(LocalQueryBuffer *query) {
+ static LocalResultType local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> results;
+ LocalResultType results;
Wrapped<R> wrec;
wrec.rec = query->global_parms.point;
@@ -118,41 +117,44 @@ public:
}
while (pq.size() > 0) {
- results.emplace_back(*(pq.peek().data));
+ results.emplace_back(pq.peek().data);
pq.pop();
}
- return std::move(results);
+ return results;
}
- static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ static void combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
+
+ Wrapped<R> wrec;
+ wrec.rec = parms->point;
+ wrec.header = 0;
+ PriorityQueue<Wrapped<R>, DistCmpMax<Wrapped<R>>> pq(parms->k, &wrec);
- PriorityQueue<R, DistCmpMax<R>> pq(parms->k, &(parms->point));
for (size_t i = 0; i < local_results.size(); i++) {
for (size_t j = 0; j < local_results[i].size(); j++) {
if (pq.size() < parms->k) {
- pq.push(&local_results[i][j].rec);
+ pq.push(local_results[i][j]);
} else {
- double head_dist = pq.peek().data->calc_distance(parms->point);
- double cur_dist = local_results[i][j].rec.calc_distance(parms->point);
+ double head_dist = pq.peek().data->rec.calc_distance(wrec.rec);
+ double cur_dist = local_results[i][j]->rec.calc_distance(wrec.rec);
if (cur_dist < head_dist) {
pq.pop();
- pq.push(&local_results[i][j].rec);
+ pq.push(local_results[i][j]);
}
}
}
}
while (pq.size() > 0) {
- output.emplace_back(*pq.peek().data);
+ output.emplace_back(pq.peek().data->rec);
pq.pop();
}
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
return false;
diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h
index f3788de..65cffa7 100644
--- a/include/query/pointlookup.h
+++ b/include/query/pointlookup.h
@@ -38,8 +38,8 @@ public:
Parameters global_parms;
};
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<Wrapped<R>> LocalResultType;
+ typedef std::vector<R> ResultType;
constexpr static bool EARLY_ABORT = true;
constexpr static bool SKIP_DELETE_FILTER = true;
@@ -65,8 +65,8 @@ public:
return;
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType result;
auto r = shard->point_lookup({query->global_parms.search_key, 0});
@@ -77,9 +77,8 @@ public:
return result;
}
- static std::vector<LocalResultType>
- local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query_buffer(LocalQueryBuffer *query) {
+ LocalResultType result;
for (size_t i = 0; i < query->buffer->get_record_count(); i++) {
auto rec = query->buffer->get(i);
@@ -95,8 +94,8 @@ public:
static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
for (auto r : local_results) {
if (r.size() > 0) {
if (r[0].is_deleted() || r[0].is_tombstone()) {
@@ -109,7 +108,7 @@ public:
}
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
return false;
diff --git a/include/query/rangecount.h b/include/query/rangecount.h
index 68d304d..0898473 100644
--- a/include/query/rangecount.h
+++ b/include/query/rangecount.h
@@ -16,7 +16,7 @@
namespace de {
namespace rc {
-template <ShardInterface S, bool FORCE_SCAN = true> class Query {
+template <ShardInterface S> class Query {
typedef typename S::RECORD R;
public:
@@ -75,8 +75,8 @@ public:
return;
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType result = {0, 0};
/*
* if the returned index is one past the end of the
@@ -88,8 +88,6 @@ public:
}
auto ptr = shard->get_record_at(query->start_idx);
- size_t reccnt = 0;
- size_t tscnt = 0;
/*
* roll the pointer forward to the first record that is
@@ -104,53 +102,48 @@ public:
ptr->rec.key <= query->global_parms.upper_bound) {
if (!ptr->is_deleted()) {
- reccnt++;
+ result.record_count++;
if (ptr->is_tombstone()) {
- tscnt++;
+ result.tombstone_count++;
}
}
ptr++;
}
- result.push_back({reccnt, tscnt});
return result;
}
- static std::vector<LocalResultType>
+ static LocalResultType
local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
- size_t reccnt = 0;
- size_t tscnt = 0;
+ LocalResultType result = {0, 0};
for (size_t i = 0; i < query->buffer->get_record_count(); i++) {
auto rec = query->buffer->get(i);
if (rec->rec.key >= query->global_parms.lower_bound &&
rec->rec.key <= query->global_parms.upper_bound) {
if (!rec->is_deleted()) {
- reccnt++;
+ result.record_count++;
if (rec->is_tombstone()) {
- tscnt++;
+ result.tombstone_count++;
}
}
}
}
- result.push_back({reccnt, tscnt});
-
return result;
}
static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
size_t reccnt = 0;
size_t tscnt = 0;
for (auto &local_result : local_results) {
- reccnt += local_result[0].record_count;
- tscnt += local_result[0].tombstone_count;
+ reccnt += local_result.record_count;
+ tscnt += local_result.tombstone_count;
}
/* if more tombstones than results, clamp the output at 0 */
@@ -158,10 +151,10 @@ public:
tscnt = reccnt;
}
- output.push_back({reccnt - tscnt});
+ output = reccnt - tscnt;
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
return false;
diff --git a/include/query/rangequery.h b/include/query/rangequery.h
index e7be39c..d9e7db8 100644
--- a/include/query/rangequery.h
+++ b/include/query/rangequery.h
@@ -38,8 +38,8 @@ public:
Parameters global_parms;
};
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<Wrapped<R>> LocalResultType;
+ typedef std::vector<R> ResultType;
constexpr static bool EARLY_ABORT = false;
constexpr static bool SKIP_DELETE_FILTER = true;
@@ -69,8 +69,8 @@ public:
return;
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType result;
/*
* if the returned index is one past the end of the
@@ -101,10 +101,9 @@ public:
return result;
}
- static std::vector<LocalResultType>
- local_query_buffer(LocalQueryBuffer *query) {
+ static LocalResultType local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
+ LocalResultType result;
for (size_t i = 0; i < query->buffer->get_record_count(); i++) {
auto rec = query->buffer->get(i);
if (rec->rec.key >= query->global_parms.lower_bound &&
@@ -116,26 +115,25 @@ public:
return result;
}
- static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
- std::vector<Cursor<LocalResultType>> cursors;
+ static void combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
+ std::vector<Cursor<Wrapped<R>>> cursors;
cursors.reserve(local_results.size());
- psudb::PriorityQueue<LocalResultType> pq(local_results.size());
+ psudb::PriorityQueue<Wrapped<R>> pq(local_results.size());
size_t total = 0;
size_t tmp_n = local_results.size();
for (size_t i = 0; i < tmp_n; ++i)
if (local_results[i].size() > 0) {
auto base = local_results[i].data();
- cursors.emplace_back(Cursor<LocalResultType>{
+ cursors.emplace_back(Cursor<Wrapped<R>>{
base, base + local_results[i].size(), 0, local_results[i].size()});
assert(i == cursors.size() - 1);
total += local_results[i].size();
pq.push(cursors[i].ptr, tmp_n - i - 1);
} else {
- cursors.emplace_back(Cursor<LocalResultType>{nullptr, nullptr, 0, 0});
+ cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
}
if (total == 0) {
@@ -146,9 +144,8 @@ public:
while (pq.size()) {
auto now = pq.peek();
- auto next = pq.size() > 1
- ? pq.peek(1)
- : psudb::queue_record<LocalResultType>{nullptr, 0};
+ auto next = pq.size() > 1 ? pq.peek(1)
+ : psudb::queue_record<Wrapped<R>>{nullptr, 0};
if (!now.data->is_tombstone() && next.data != nullptr &&
now.data->rec == next.data->rec && next.data->is_tombstone()) {
@@ -156,9 +153,9 @@ public:
pq.pop();
auto &cursor1 = cursors[tmp_n - now.version - 1];
auto &cursor2 = cursors[tmp_n - next.version - 1];
- if (advance_cursor<LocalResultType>(cursor1))
+ if (advance_cursor<Wrapped<R>>(cursor1))
pq.push(cursor1.ptr, now.version);
- if (advance_cursor<LocalResultType>(cursor2))
+ if (advance_cursor<Wrapped<R>>(cursor2))
pq.push(cursor2.ptr, next.version);
} else {
auto &cursor = cursors[tmp_n - now.version - 1];
@@ -167,7 +164,7 @@ public:
pq.pop();
- if (advance_cursor<LocalResultType>(cursor))
+ if (advance_cursor<Wrapped<R>>(cursor))
pq.push(cursor.ptr, now.version);
}
}
@@ -175,7 +172,7 @@ public:
return;
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
return false;
diff --git a/include/query/wss.h b/include/query/wss.h
index 54620ca..d4e75f3 100644
--- a/include/query/wss.h
+++ b/include/query/wss.h
@@ -49,8 +49,8 @@ public:
constexpr static bool EARLY_ABORT = false;
constexpr static bool SKIP_DELETE_FILTER = false;
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<R> LocalResultType;
+ typedef std::vector<R> ResultType;
static LocalQuery *local_preproc(S *shard, Parameters *parms) {
auto query = new LocalQuery();
@@ -130,8 +130,8 @@ public:
}
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType result;
if (query->sample_size == 0) {
return result;
@@ -139,25 +139,25 @@ public:
for (size_t i = 0; i < query->sample_size; i++) {
size_t idx = shard->get_weighted_sample(query->global_parms.rng);
- if (!shard->get_record_at(idx)->is_deleted()) {
- result.emplace_back(*shard->get_record_at(idx));
+ if (!shard->get_record_at(idx)->is_deleted() && !shard->get_record_at(idx)->is_tombstone()) {
+ result.emplace_back(shard->get_record_at(idx)->rec);
}
}
return result;
}
- static std::vector<LocalResultType>
+ static LocalResultType
local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
+ LocalResultType result;
for (size_t i = 0; i < query->sample_size; i++) {
auto idx = gsl_rng_uniform_int(query->global_parms.rng, query->cutoff);
auto rec = query->buffer->get(idx);
auto test = gsl_rng_uniform(query->global_parms.rng) * query->max_weight;
- if (test <= rec->rec.weight && !rec->is_deleted()) {
- result.emplace_back(*rec);
+ if (test <= rec->rec.weight && !rec->is_deleted() && !rec->is_tombstone()) {
+ result.emplace_back(rec->rec);
}
}
@@ -165,16 +165,16 @@ public:
}
static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
for (size_t i = 0; i < local_results.size(); i++) {
for (size_t j = 0; j < local_results[i].size(); j++) {
- output.emplace_back(local_results[i][j].rec);
+ output.emplace_back(local_results[i][j]);
}
}
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
if (output.size() < parms->sample_size) {