diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-22 13:25:20 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-22 13:25:20 -0400 |
| commit | 08d6c84b9d69b500c964a8ff66e726e1f01f2095 (patch) | |
| tree | 8bc156b54383de8a4a347e463901dcb7bdd1da10 /include/shard/WIRS.h | |
| parent | 6fd50506d2e50d2faf2478a2883a2ef1b4840a78 (diff) | |
| download | dynamic-extension-08d6c84b9d69b500c964a8ff66e726e1f01f2095.tar.gz | |
Progress towards generalization of shard interface
Diffstat (limited to 'include/shard/WIRS.h')
| -rw-r--r-- | include/shard/WIRS.h | 75 |
1 files changed, 29 insertions, 46 deletions
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h index 9f37396..42dbcfd 100644 --- a/include/shard/WIRS.h +++ b/include/shard/WIRS.h @@ -18,6 +18,7 @@ #include "util/Cursor.h" #include "ds/Alias.h" #include "ds/BloomFilter.h" +#include "util/bf_config.h" #include "util/Record.h" #include "framework/MutableBuffer.h" @@ -25,6 +26,11 @@ namespace de { thread_local size_t wirs_cancelations = 0; +template <WeightedRecordInterface R> +struct wirs_query_parms { + decltype(R::key) lower_bound; + decltype(R::key) upper_bound; +}; template <WeightedRecordInterface R> class WIRS { @@ -54,12 +60,8 @@ private: }; public: - struct wirs_query_parms { - K lower_bound; - K upper_bound; - }; - WIRS(MutableBuffer<R>* buffer, BloomFilter* bf) + WIRS(MutableBuffer<R>* buffer) : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0), m_root(nullptr) { @@ -67,6 +69,8 @@ public: assert(alloc_size % CACHELINE_SIZE == 0); m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_bf = new BloomFilter<K>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS); + size_t offset = 0; m_reccnt = 0; auto base = buffer->sorted_output(); @@ -88,9 +92,9 @@ public: m_data[m_reccnt++] = *base; m_total_weight+= base->weight; - if (bf && base->is_tombstone()) { + if (m_bf && base->is_tombstone()) { m_tombstone_cnt++; - bf->insert(base->key); + m_bf->insert(base->key); } base++; @@ -101,7 +105,7 @@ public: } } - WIRS(WIRS** shards, size_t len, BloomFilter* bf) + WIRS(WIRS** shards, size_t len) : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0), m_root(nullptr) { std::vector<Cursor<R>> cursors; @@ -110,18 +114,22 @@ public: PriorityQueue<R> pq(len); size_t attemp_reccnt = 0; + size_t tombstone_count = 0; for (size_t i = 0; i < len; ++i) { if (shards[i]) { auto base = shards[i]->sorted_output(); cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); attemp_reccnt += shards[i]->get_record_count(); + tombstone_count += shards[i]->get_tombstone_count(); pq.push(cursors[i].ptr, i); } else { cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0}); } } + m_bf = new BloomFilter<K>(BF_FPR, tombstone_count, BF_HASH_FUNCS); + size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); @@ -142,9 +150,9 @@ public: if (!cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; m_total_weight += cursor.ptr->weight; - if (bf && cursor.ptr->is_tombstone()) { + if (m_bf && cursor.ptr->is_tombstone()) { ++m_tombstone_cnt; - if (bf) bf->insert(cursor.ptr->key); + if (m_bf) m_bf->insert(cursor.ptr->key); } } pq.pop(); @@ -164,6 +172,8 @@ public: if (m_alias[i]) delete m_alias[i]; } + if (m_bf) delete m_bf; + free_tree(m_root); } @@ -202,12 +212,13 @@ public: return m_data + idx; } + /* // low - high -> decompose to a set of nodes. // Build Alias across the decomposed nodes. WIRSState<R>* get_query_state(void *query_parameters) { auto res = new WIRSState(); - K lower_key = ((wirs_query_parms *) query_parameters)->lower_bound; - K upper_key = ((wirs_query_parms *) query_parameters)->upper_bound; + K lower_key = ((wirs_query_parms<R> *) query_parameters)->lower_bound; + K upper_key = ((wirs_query_parms<R> *) query_parameters)->upper_bound; // Simulate a stack to unfold recursion. double tot_weight = 0.0; @@ -240,40 +251,7 @@ public: WIRSState<R> *s = (WIRSState<R> *) state; delete s; } - - // returns the number of records sampled - // NOTE: This operation returns records strictly between the lower and upper bounds, not - // including them. - size_t get_samples(void* shard_state, std::vector<R> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { - WIRSState<R> *state = (WIRSState<R> *) shard_state; - if (sample_sz == 0) { - return 0; - } - - // k -> sampling: three levels. 1. select a node -> select a fat point -> select a record. - size_t cnt = 0; - size_t attempts = 0; - do { - ++attempts; - // first level.... - auto node = state->nodes[state->top_level_alias->get(rng)]; - // second level... - auto fat_point = node->low + node->alias->get(rng); - // third level... - size_t rec_offset = fat_point * m_group_size + m_alias[fat_point]->get(rng); - auto record = m_data + rec_offset; - - // bounds rejection - if (lower_key > record->key || upper_key < record->key) { - continue; - } - - result_set.emplace_back(*record); - cnt++; - } while (attempts < sample_sz); - - return cnt; - } + */ size_t get_lower_bound(const K& key) const { size_t min = 0; @@ -294,6 +272,10 @@ public: } bool check_tombstone(const R& rec) { + if(!m_bf.lookup(rec.key)) { + return false; + } + m_ts_check_cnt++; size_t idx = get_lower_bound(rec.key); if (idx >= m_reccnt) { @@ -414,6 +396,7 @@ private: size_t m_group_size; size_t m_ts_check_cnt; size_t m_deleted_cnt; + BloomFilter<K> m_bf; // The number of rejections caused by tombstones // in this WIRS. |