summaryrefslogtreecommitdiffstats
path: root/include/query
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-12-06 16:54:05 -0500
committerDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-12-06 16:54:05 -0500
commit9876d74e503df64eb9e82e540ca41fcf593ebf64 (patch)
tree3512690afa8c04f702bd8de500bf8f41b05571c4 /include/query
parente2b81a2d311470d503edae93e68e82791f6bb17c (diff)
downloaddynamic-extension-9876d74e503df64eb9e82e540ca41fcf593ebf64.tar.gz
Adjusted query result interfacesHEADmaster
Now, the vector<> is part of the user-defined type, not required by the framework. This should allow for more flexibility in either using alternative containers, or for more sensible implementations of queries with single value results (like range count).
Diffstat (limited to 'include/query')
-rw-r--r--include/query/irs.h41
-rw-r--r--include/query/knn.h42
-rw-r--r--include/query/pointlookup.h19
-rw-r--r--include/query/rangecount.h37
-rw-r--r--include/query/rangequery.h39
-rw-r--r--include/query/wss.h28
6 files changed, 102 insertions, 104 deletions
diff --git a/include/query/irs.h b/include/query/irs.h
index 6dec850..ec6fa29 100644
--- a/include/query/irs.h
+++ b/include/query/irs.h
@@ -40,15 +40,15 @@ public:
BufferView<R> *buffer;
size_t cutoff;
- std::vector<Wrapped<R>> records;
+ std::vector<R> records;
std::unique_ptr<psudb::Alias> alias;
size_t sample_size;
Parameters global_parms;
};
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<R> LocalResultType;
+ typedef std::vector<R> ResultType;
constexpr static bool EARLY_ABORT = false;
constexpr static bool SKIP_DELETE_FILTER = false;
@@ -86,9 +86,11 @@ public:
}
for (size_t i = 0; i < query->cutoff; i++) {
- if ((query->buffer->get(i)->rec.key >= query->global_parms.lower_bound) &&
- (buffer->get(i)->rec.key <= query->global_parms.upper_bound)) {
- query->records.emplace_back(*(query->buffer->get(i)));
+ auto rec = buffer->get(i);
+ if (rec->rec.key >= query->global_parms.lower_bound &&
+ rec->rec.key <= query->global_parms.upper_bound &&
+ (!rec->is_deleted() && !rec->is_tombstone())) {
+ query->records.emplace_back(query->buffer->get(i)->rec);
}
}
@@ -163,10 +165,10 @@ public:
}
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
auto sample_sz = query->sample_size;
- std::vector<LocalResultType> result_set;
+ LocalResultType result_set;
if (sample_sz == 0 || query->lower_idx == shard->get_record_count()) {
return result_set;
@@ -180,15 +182,19 @@ public:
(range_length > 0)
? gsl_rng_uniform_int(query->global_parms.rng, range_length)
: 0;
- result_set.emplace_back(*shard->get_record_at(query->lower_idx + idx));
+ auto wrec = shard->get_record_at(query->lower_idx + idx);
+
+ if (!wrec->is_deleted() && !wrec->is_tombstone()) {
+ result_set.emplace_back(wrec->rec);
+ }
} while (attempts < sample_sz);
return result_set;
}
- static std::vector<LocalResultType>
+ static LocalResultType
local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
+ LocalResultType result;
result.reserve(query->sample_size);
if constexpr (REJECTION) {
@@ -197,8 +203,9 @@ public:
auto rec = query->buffer->get(idx);
if (rec->rec.key >= query->global_parms.lower_bound &&
- rec->rec.key <= query->global_parms.upper_bound) {
- result.emplace_back(*rec);
+ rec->rec.key <= query->global_parms.upper_bound &&
+ (!rec->is_deleted() && !rec->is_tombstone())) {
+ result.emplace_back(rec->rec);
}
}
@@ -215,16 +222,16 @@ public:
}
static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
for (size_t i = 0; i < local_results.size(); i++) {
for (size_t j = 0; j < local_results[i].size(); j++) {
- output.emplace_back(local_results[i][j].rec);
+ output.emplace_back(local_results[i][j]);
}
}
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
if (output.size() < parms->sample_size) {
diff --git a/include/query/knn.h b/include/query/knn.h
index 87ea10a..91a032c 100644
--- a/include/query/knn.h
+++ b/include/query/knn.h
@@ -39,8 +39,8 @@ public:
Parameters global_parms;
};
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<const Wrapped<R> *> LocalResultType;
+ typedef std::vector<R> ResultType;
constexpr static bool EARLY_ABORT = false;
constexpr static bool SKIP_DELETE_FILTER = true;
@@ -66,8 +66,8 @@ public:
return;
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> results;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType results;
Wrapped<R> wrec;
wrec.rec = query->global_parms.point;
@@ -79,17 +79,16 @@ public:
shard->search(query->global_parms.point, query->global_parms.k, pq);
while (pq.size() > 0) {
- results.emplace_back(*pq.peek().data);
+ results.emplace_back(pq.peek().data);
pq.pop();
}
return results;
}
- static std::vector<LocalResultType>
- local_query_buffer(LocalQueryBuffer *query) {
+ static LocalResultType local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> results;
+ LocalResultType results;
Wrapped<R> wrec;
wrec.rec = query->global_parms.point;
@@ -118,41 +117,44 @@ public:
}
while (pq.size() > 0) {
- results.emplace_back(*(pq.peek().data));
+ results.emplace_back(pq.peek().data);
pq.pop();
}
- return std::move(results);
+ return results;
}
- static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ static void combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
+
+ Wrapped<R> wrec;
+ wrec.rec = parms->point;
+ wrec.header = 0;
+ PriorityQueue<Wrapped<R>, DistCmpMax<Wrapped<R>>> pq(parms->k, &wrec);
- PriorityQueue<R, DistCmpMax<R>> pq(parms->k, &(parms->point));
for (size_t i = 0; i < local_results.size(); i++) {
for (size_t j = 0; j < local_results[i].size(); j++) {
if (pq.size() < parms->k) {
- pq.push(&local_results[i][j].rec);
+ pq.push(local_results[i][j]);
} else {
- double head_dist = pq.peek().data->calc_distance(parms->point);
- double cur_dist = local_results[i][j].rec.calc_distance(parms->point);
+ double head_dist = pq.peek().data->rec.calc_distance(wrec.rec);
+ double cur_dist = local_results[i][j]->rec.calc_distance(wrec.rec);
if (cur_dist < head_dist) {
pq.pop();
- pq.push(&local_results[i][j].rec);
+ pq.push(local_results[i][j]);
}
}
}
}
while (pq.size() > 0) {
- output.emplace_back(*pq.peek().data);
+ output.emplace_back(pq.peek().data->rec);
pq.pop();
}
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
return false;
diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h
index f3788de..65cffa7 100644
--- a/include/query/pointlookup.h
+++ b/include/query/pointlookup.h
@@ -38,8 +38,8 @@ public:
Parameters global_parms;
};
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<Wrapped<R>> LocalResultType;
+ typedef std::vector<R> ResultType;
constexpr static bool EARLY_ABORT = true;
constexpr static bool SKIP_DELETE_FILTER = true;
@@ -65,8 +65,8 @@ public:
return;
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType result;
auto r = shard->point_lookup({query->global_parms.search_key, 0});
@@ -77,9 +77,8 @@ public:
return result;
}
- static std::vector<LocalResultType>
- local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query_buffer(LocalQueryBuffer *query) {
+ LocalResultType result;
for (size_t i = 0; i < query->buffer->get_record_count(); i++) {
auto rec = query->buffer->get(i);
@@ -95,8 +94,8 @@ public:
static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
for (auto r : local_results) {
if (r.size() > 0) {
if (r[0].is_deleted() || r[0].is_tombstone()) {
@@ -109,7 +108,7 @@ public:
}
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
return false;
diff --git a/include/query/rangecount.h b/include/query/rangecount.h
index 68d304d..0898473 100644
--- a/include/query/rangecount.h
+++ b/include/query/rangecount.h
@@ -16,7 +16,7 @@
namespace de {
namespace rc {
-template <ShardInterface S, bool FORCE_SCAN = true> class Query {
+template <ShardInterface S> class Query {
typedef typename S::RECORD R;
public:
@@ -75,8 +75,8 @@ public:
return;
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType result = {0, 0};
/*
* if the returned index is one past the end of the
@@ -88,8 +88,6 @@ public:
}
auto ptr = shard->get_record_at(query->start_idx);
- size_t reccnt = 0;
- size_t tscnt = 0;
/*
* roll the pointer forward to the first record that is
@@ -104,53 +102,48 @@ public:
ptr->rec.key <= query->global_parms.upper_bound) {
if (!ptr->is_deleted()) {
- reccnt++;
+ result.record_count++;
if (ptr->is_tombstone()) {
- tscnt++;
+ result.tombstone_count++;
}
}
ptr++;
}
- result.push_back({reccnt, tscnt});
return result;
}
- static std::vector<LocalResultType>
+ static LocalResultType
local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
- size_t reccnt = 0;
- size_t tscnt = 0;
+ LocalResultType result = {0, 0};
for (size_t i = 0; i < query->buffer->get_record_count(); i++) {
auto rec = query->buffer->get(i);
if (rec->rec.key >= query->global_parms.lower_bound &&
rec->rec.key <= query->global_parms.upper_bound) {
if (!rec->is_deleted()) {
- reccnt++;
+ result.record_count++;
if (rec->is_tombstone()) {
- tscnt++;
+ result.tombstone_count++;
}
}
}
}
- result.push_back({reccnt, tscnt});
-
return result;
}
static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
size_t reccnt = 0;
size_t tscnt = 0;
for (auto &local_result : local_results) {
- reccnt += local_result[0].record_count;
- tscnt += local_result[0].tombstone_count;
+ reccnt += local_result.record_count;
+ tscnt += local_result.tombstone_count;
}
/* if more tombstones than results, clamp the output at 0 */
@@ -158,10 +151,10 @@ public:
tscnt = reccnt;
}
- output.push_back({reccnt - tscnt});
+ output = reccnt - tscnt;
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
return false;
diff --git a/include/query/rangequery.h b/include/query/rangequery.h
index e7be39c..d9e7db8 100644
--- a/include/query/rangequery.h
+++ b/include/query/rangequery.h
@@ -38,8 +38,8 @@ public:
Parameters global_parms;
};
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<Wrapped<R>> LocalResultType;
+ typedef std::vector<R> ResultType;
constexpr static bool EARLY_ABORT = false;
constexpr static bool SKIP_DELETE_FILTER = true;
@@ -69,8 +69,8 @@ public:
return;
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType result;
/*
* if the returned index is one past the end of the
@@ -101,10 +101,9 @@ public:
return result;
}
- static std::vector<LocalResultType>
- local_query_buffer(LocalQueryBuffer *query) {
+ static LocalResultType local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
+ LocalResultType result;
for (size_t i = 0; i < query->buffer->get_record_count(); i++) {
auto rec = query->buffer->get(i);
if (rec->rec.key >= query->global_parms.lower_bound &&
@@ -116,26 +115,25 @@ public:
return result;
}
- static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
- std::vector<Cursor<LocalResultType>> cursors;
+ static void combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
+ std::vector<Cursor<Wrapped<R>>> cursors;
cursors.reserve(local_results.size());
- psudb::PriorityQueue<LocalResultType> pq(local_results.size());
+ psudb::PriorityQueue<Wrapped<R>> pq(local_results.size());
size_t total = 0;
size_t tmp_n = local_results.size();
for (size_t i = 0; i < tmp_n; ++i)
if (local_results[i].size() > 0) {
auto base = local_results[i].data();
- cursors.emplace_back(Cursor<LocalResultType>{
+ cursors.emplace_back(Cursor<Wrapped<R>>{
base, base + local_results[i].size(), 0, local_results[i].size()});
assert(i == cursors.size() - 1);
total += local_results[i].size();
pq.push(cursors[i].ptr, tmp_n - i - 1);
} else {
- cursors.emplace_back(Cursor<LocalResultType>{nullptr, nullptr, 0, 0});
+ cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
}
if (total == 0) {
@@ -146,9 +144,8 @@ public:
while (pq.size()) {
auto now = pq.peek();
- auto next = pq.size() > 1
- ? pq.peek(1)
- : psudb::queue_record<LocalResultType>{nullptr, 0};
+ auto next = pq.size() > 1 ? pq.peek(1)
+ : psudb::queue_record<Wrapped<R>>{nullptr, 0};
if (!now.data->is_tombstone() && next.data != nullptr &&
now.data->rec == next.data->rec && next.data->is_tombstone()) {
@@ -156,9 +153,9 @@ public:
pq.pop();
auto &cursor1 = cursors[tmp_n - now.version - 1];
auto &cursor2 = cursors[tmp_n - next.version - 1];
- if (advance_cursor<LocalResultType>(cursor1))
+ if (advance_cursor<Wrapped<R>>(cursor1))
pq.push(cursor1.ptr, now.version);
- if (advance_cursor<LocalResultType>(cursor2))
+ if (advance_cursor<Wrapped<R>>(cursor2))
pq.push(cursor2.ptr, next.version);
} else {
auto &cursor = cursors[tmp_n - now.version - 1];
@@ -167,7 +164,7 @@ public:
pq.pop();
- if (advance_cursor<LocalResultType>(cursor))
+ if (advance_cursor<Wrapped<R>>(cursor))
pq.push(cursor.ptr, now.version);
}
}
@@ -175,7 +172,7 @@ public:
return;
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
return false;
diff --git a/include/query/wss.h b/include/query/wss.h
index 54620ca..d4e75f3 100644
--- a/include/query/wss.h
+++ b/include/query/wss.h
@@ -49,8 +49,8 @@ public:
constexpr static bool EARLY_ABORT = false;
constexpr static bool SKIP_DELETE_FILTER = false;
- typedef Wrapped<R> LocalResultType;
- typedef R ResultType;
+ typedef std::vector<R> LocalResultType;
+ typedef std::vector<R> ResultType;
static LocalQuery *local_preproc(S *shard, Parameters *parms) {
auto query = new LocalQuery();
@@ -130,8 +130,8 @@ public:
}
}
- static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) {
- std::vector<LocalResultType> result;
+ static LocalResultType local_query(S *shard, LocalQuery *query) {
+ LocalResultType result;
if (query->sample_size == 0) {
return result;
@@ -139,25 +139,25 @@ public:
for (size_t i = 0; i < query->sample_size; i++) {
size_t idx = shard->get_weighted_sample(query->global_parms.rng);
- if (!shard->get_record_at(idx)->is_deleted()) {
- result.emplace_back(*shard->get_record_at(idx));
+ if (!shard->get_record_at(idx)->is_deleted() && !shard->get_record_at(idx)->is_tombstone()) {
+ result.emplace_back(shard->get_record_at(idx)->rec);
}
}
return result;
}
- static std::vector<LocalResultType>
+ static LocalResultType
local_query_buffer(LocalQueryBuffer *query) {
- std::vector<LocalResultType> result;
+ LocalResultType result;
for (size_t i = 0; i < query->sample_size; i++) {
auto idx = gsl_rng_uniform_int(query->global_parms.rng, query->cutoff);
auto rec = query->buffer->get(idx);
auto test = gsl_rng_uniform(query->global_parms.rng) * query->max_weight;
- if (test <= rec->rec.weight && !rec->is_deleted()) {
- result.emplace_back(*rec);
+ if (test <= rec->rec.weight && !rec->is_deleted() && !rec->is_tombstone()) {
+ result.emplace_back(rec->rec);
}
}
@@ -165,16 +165,16 @@ public:
}
static void
- combine(std::vector<std::vector<LocalResultType>> const &local_results,
- Parameters *parms, std::vector<ResultType> &output) {
+ combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
for (size_t i = 0; i < local_results.size(); i++) {
for (size_t j = 0; j < local_results[i].size(); j++) {
- output.emplace_back(local_results[i][j].rec);
+ output.emplace_back(local_results[i][j]);
}
}
}
- static bool repeat(Parameters *parms, std::vector<ResultType> &output,
+ static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
if (output.size() < parms->sample_size) {