summaryrefslogtreecommitdiffstats
path: root/include/shard/WIRS.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-05-22 13:25:20 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-05-22 13:25:20 -0400
commit08d6c84b9d69b500c964a8ff66e726e1f01f2095 (patch)
tree8bc156b54383de8a4a347e463901dcb7bdd1da10 /include/shard/WIRS.h
parent6fd50506d2e50d2faf2478a2883a2ef1b4840a78 (diff)
downloaddynamic-extension-08d6c84b9d69b500c964a8ff66e726e1f01f2095.tar.gz
Progress towards generalization of shard interface
Diffstat (limited to 'include/shard/WIRS.h')
-rw-r--r--include/shard/WIRS.h75
1 files changed, 29 insertions, 46 deletions
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h
index 9f37396..42dbcfd 100644
--- a/include/shard/WIRS.h
+++ b/include/shard/WIRS.h
@@ -18,6 +18,7 @@
#include "util/Cursor.h"
#include "ds/Alias.h"
#include "ds/BloomFilter.h"
+#include "util/bf_config.h"
#include "util/Record.h"
#include "framework/MutableBuffer.h"
@@ -25,6 +26,11 @@ namespace de {
thread_local size_t wirs_cancelations = 0;
+template <WeightedRecordInterface R>
+struct wirs_query_parms {
+ decltype(R::key) lower_bound;
+ decltype(R::key) upper_bound;
+};
template <WeightedRecordInterface R>
class WIRS {
@@ -54,12 +60,8 @@ private:
};
public:
- struct wirs_query_parms {
- K lower_bound;
- K upper_bound;
- };
- WIRS(MutableBuffer<R>* buffer, BloomFilter* bf)
+ WIRS(MutableBuffer<R>* buffer)
: m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0),
m_ts_check_cnt(0), m_root(nullptr) {
@@ -67,6 +69,8 @@ public:
assert(alloc_size % CACHELINE_SIZE == 0);
m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_bf = new BloomFilter<K>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS);
+
size_t offset = 0;
m_reccnt = 0;
auto base = buffer->sorted_output();
@@ -88,9 +92,9 @@ public:
m_data[m_reccnt++] = *base;
m_total_weight+= base->weight;
- if (bf && base->is_tombstone()) {
+ if (m_bf && base->is_tombstone()) {
m_tombstone_cnt++;
- bf->insert(base->key);
+ m_bf->insert(base->key);
}
base++;
@@ -101,7 +105,7 @@ public:
}
}
- WIRS(WIRS** shards, size_t len, BloomFilter* bf)
+ WIRS(WIRS** shards, size_t len)
: m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0),
m_root(nullptr) {
std::vector<Cursor<R>> cursors;
@@ -110,18 +114,22 @@ public:
PriorityQueue<R> pq(len);
size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
for (size_t i = 0; i < len; ++i) {
if (shards[i]) {
auto base = shards[i]->sorted_output();
cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
attemp_reccnt += shards[i]->get_record_count();
+ tombstone_count += shards[i]->get_tombstone_count();
pq.push(cursors[i].ptr, i);
} else {
cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0});
}
}
+ m_bf = new BloomFilter<K>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
+
size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
@@ -142,9 +150,9 @@ public:
if (!cursor.ptr->is_deleted()) {
m_data[m_reccnt++] = *cursor.ptr;
m_total_weight += cursor.ptr->weight;
- if (bf && cursor.ptr->is_tombstone()) {
+ if (m_bf && cursor.ptr->is_tombstone()) {
++m_tombstone_cnt;
- if (bf) bf->insert(cursor.ptr->key);
+ if (m_bf) m_bf->insert(cursor.ptr->key);
}
}
pq.pop();
@@ -164,6 +172,8 @@ public:
if (m_alias[i]) delete m_alias[i];
}
+ if (m_bf) delete m_bf;
+
free_tree(m_root);
}
@@ -202,12 +212,13 @@ public:
return m_data + idx;
}
+ /*
// low - high -> decompose to a set of nodes.
// Build Alias across the decomposed nodes.
WIRSState<R>* get_query_state(void *query_parameters) {
auto res = new WIRSState();
- K lower_key = ((wirs_query_parms *) query_parameters)->lower_bound;
- K upper_key = ((wirs_query_parms *) query_parameters)->upper_bound;
+ K lower_key = ((wirs_query_parms<R> *) query_parameters)->lower_bound;
+ K upper_key = ((wirs_query_parms<R> *) query_parameters)->upper_bound;
// Simulate a stack to unfold recursion.
double tot_weight = 0.0;
@@ -240,40 +251,7 @@ public:
WIRSState<R> *s = (WIRSState<R> *) state;
delete s;
}
-
- // returns the number of records sampled
- // NOTE: This operation returns records strictly between the lower and upper bounds, not
- // including them.
- size_t get_samples(void* shard_state, std::vector<R> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
- WIRSState<R> *state = (WIRSState<R> *) shard_state;
- if (sample_sz == 0) {
- return 0;
- }
-
- // k -> sampling: three levels. 1. select a node -> select a fat point -> select a record.
- size_t cnt = 0;
- size_t attempts = 0;
- do {
- ++attempts;
- // first level....
- auto node = state->nodes[state->top_level_alias->get(rng)];
- // second level...
- auto fat_point = node->low + node->alias->get(rng);
- // third level...
- size_t rec_offset = fat_point * m_group_size + m_alias[fat_point]->get(rng);
- auto record = m_data + rec_offset;
-
- // bounds rejection
- if (lower_key > record->key || upper_key < record->key) {
- continue;
- }
-
- result_set.emplace_back(*record);
- cnt++;
- } while (attempts < sample_sz);
-
- return cnt;
- }
+ */
size_t get_lower_bound(const K& key) const {
size_t min = 0;
@@ -294,6 +272,10 @@ public:
}
bool check_tombstone(const R& rec) {
+ if(!m_bf.lookup(rec.key)) {
+ return false;
+ }
+
m_ts_check_cnt++;
size_t idx = get_lower_bound(rec.key);
if (idx >= m_reccnt) {
@@ -414,6 +396,7 @@ private:
size_t m_group_size;
size_t m_ts_check_cnt;
size_t m_deleted_cnt;
+ BloomFilter<K> m_bf;
// The number of rejections caused by tombstones
// in this WIRS.