diff options
Diffstat (limited to 'include/shard')
| -rw-r--r-- | include/shard/MemISAM.h | 64 | ||||
| -rw-r--r-- | include/shard/WIRS.h | 75 |
2 files changed, 50 insertions, 89 deletions
diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h index d1f3bb3..55699be 100644 --- a/include/shard/MemISAM.h +++ b/include/shard/MemISAM.h @@ -15,6 +15,7 @@ #include <memory> #include "framework/MutableBuffer.h" +#include "util/bf_config.h" #include "ds/PriorityQueue.h" #include "util/Cursor.h" #include "util/timer.h" @@ -43,37 +44,11 @@ constexpr static size_t inmem_isam_node_keyskip = sizeof(K) * inmem_isam_fanout; static_assert(sizeof(InMemISAMNode) == inmem_isam_node_size, "node size does not match"); public: - MemISAM(std::string data_fname, size_t record_cnt, size_t tombstone_cnt, BloomFilter *bf) - : m_reccnt(record_cnt), m_tombstone_cnt(tombstone_cnt), m_deleted_cnt(0) { - - // read the stored data file the file - size_t alloc_size = (record_cnt * sizeof(R)) + (CACHELINE_SIZE - (record_cnt * sizeof(R)) % CACHELINE_SIZE); - assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, alloc_size); - - FILE *file = fopen(data_fname.c_str(), "rb"); - assert(file); - auto res = fread(m_data, sizeof(R), m_reccnt, file); - assert (res == m_reccnt); - fclose(file); - - // We can't really persist the internal structure, as it uses - // pointers, which are invalidated by the move. So we'll just - // rebuild it. - this->build_internal_levels(); - - // rebuild the bloom filter - for (size_t i=0; i<m_reccnt; i++) { - auto rec = this->get_record_at(i); - if (rec->is_tombstone()) { - bf->insert(rec->key); - } - } - } - - MemISAM(MutableBuffer<R>* buffer, BloomFilter* bf) + MemISAM(MutableBuffer<R>* buffer) :m_reccnt(0), m_tombstone_cnt(0), m_isam_nodes(nullptr), m_deleted_cnt(0) { + m_bf = new BloomFilter<K>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS); + size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); @@ -104,9 +79,9 @@ public: //Masking off the ts. base->header &= 1; m_data[m_reccnt++] = *base; - if (bf && base->is_tombstone()) { + if (m_bf && base->is_tombstone()) { ++m_tombstone_cnt; - bf->insert(base->key); + m_bf->insert(base->key); } base++; @@ -124,26 +99,30 @@ public: //fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time); } - MemISAM(MemISAM** runs, size_t len, BloomFilter* bf) - :m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_isam_nodes(nullptr) { + MemISAM(MemISAM** runs, size_t len) + : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_isam_nodes(nullptr) { std::vector<Cursor<R>> cursors; cursors.reserve(len); PriorityQueue<R> pq(len); size_t attemp_reccnt = 0; + size_t tombstone_count = 0; for (size_t i = 0; i < len; ++i) { if (runs[i]) { auto base = runs[i]->sorted_output(); cursors.emplace_back(Cursor{base, base + runs[i]->get_record_count(), 0, runs[i]->get_record_count()}); attemp_reccnt += runs[i]->get_record_count(); + tombstone_count += runs[i]->get_tombstone_count(); pq.push(cursors[i].ptr, i); } else { cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0}); } } + m_bf = new BloomFilter<K>(BF_FPR, tombstone_count, BF_HASH_FUNCS); + size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); @@ -167,7 +146,7 @@ public: m_data[m_reccnt++] = *cursor.ptr; if (cursor.ptr->is_tombstone()) { ++m_tombstone_cnt; - bf->insert(cursor.ptr->key); + m_bf->insert(cursor.ptr->key); } } pq.pop(); @@ -184,6 +163,7 @@ public: ~MemISAM() { if (m_data) free(m_data); if (m_isam_nodes) free(m_isam_nodes); + if (m_bf) delete m_bf; } R* sorted_output() const { @@ -259,7 +239,11 @@ public: return pos - m_data; } - bool check_tombstone(const K& key, const V& val) const { + bool check_tombstone(const R& rec) const{ + if (!m_bf->lookup(rec.key)) { + return false; + } + size_t idx = get_lower_bound(key); if (idx >= m_reccnt) { return false; @@ -271,17 +255,10 @@ public: return *ptr == R {key, val} && ptr->is_tombstone(); } - size_t get_memory_utilization() { + size_t get_memory_usage() { return m_reccnt * sizeof(R) + m_internal_node_cnt * inmem_isam_node_size; } - void persist_to_file(std::string data_fname) { - FILE *file = fopen(data_fname.c_str(), "wb"); - assert(file); - fwrite(m_data, sizeof(R), m_reccnt, file); - fclose(file); - } - private: void build_internal_levels() { size_t n_leaf_nodes = m_reccnt / inmem_isam_leaf_fanout + (m_reccnt % inmem_isam_leaf_fanout != 0); @@ -349,6 +326,7 @@ private: // Members: sorted data, internal ISAM levels, reccnt; R* m_data; + BloomFilter<K> *m_bf; InMemISAMNode* m_isam_nodes; InMemISAMNode* m_root; size_t m_reccnt; diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h index 9f37396..42dbcfd 100644 --- a/include/shard/WIRS.h +++ b/include/shard/WIRS.h @@ -18,6 +18,7 @@ #include "util/Cursor.h" #include "ds/Alias.h" #include "ds/BloomFilter.h" +#include "util/bf_config.h" #include "util/Record.h" #include "framework/MutableBuffer.h" @@ -25,6 +26,11 @@ namespace de { thread_local size_t wirs_cancelations = 0; +template <WeightedRecordInterface R> +struct wirs_query_parms { + decltype(R::key) lower_bound; + decltype(R::key) upper_bound; +}; template <WeightedRecordInterface R> class WIRS { @@ -54,12 +60,8 @@ private: }; public: - struct wirs_query_parms { - K lower_bound; - K upper_bound; - }; - WIRS(MutableBuffer<R>* buffer, BloomFilter* bf) + WIRS(MutableBuffer<R>* buffer) : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0), m_root(nullptr) { @@ -67,6 +69,8 @@ public: assert(alloc_size % CACHELINE_SIZE == 0); m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_bf = new BloomFilter<K>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS); + size_t offset = 0; m_reccnt = 0; auto base = buffer->sorted_output(); @@ -88,9 +92,9 @@ public: m_data[m_reccnt++] = *base; m_total_weight+= base->weight; - if (bf && base->is_tombstone()) { + if (m_bf && base->is_tombstone()) { m_tombstone_cnt++; - bf->insert(base->key); + m_bf->insert(base->key); } base++; @@ -101,7 +105,7 @@ public: } } - WIRS(WIRS** shards, size_t len, BloomFilter* bf) + WIRS(WIRS** shards, size_t len) : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0), m_root(nullptr) { std::vector<Cursor<R>> cursors; @@ -110,18 +114,22 @@ public: PriorityQueue<R> pq(len); size_t attemp_reccnt = 0; + size_t tombstone_count = 0; for (size_t i = 0; i < len; ++i) { if (shards[i]) { auto base = shards[i]->sorted_output(); cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); attemp_reccnt += shards[i]->get_record_count(); + tombstone_count += shards[i]->get_tombstone_count(); pq.push(cursors[i].ptr, i); } else { cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0}); } } + m_bf = new BloomFilter<K>(BF_FPR, tombstone_count, BF_HASH_FUNCS); + size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); @@ -142,9 +150,9 @@ public: if (!cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; m_total_weight += cursor.ptr->weight; - if (bf && cursor.ptr->is_tombstone()) { + if (m_bf && cursor.ptr->is_tombstone()) { ++m_tombstone_cnt; - if (bf) bf->insert(cursor.ptr->key); + if (m_bf) m_bf->insert(cursor.ptr->key); } } pq.pop(); @@ -164,6 +172,8 @@ public: if (m_alias[i]) delete m_alias[i]; } + if (m_bf) delete m_bf; + free_tree(m_root); } @@ -202,12 +212,13 @@ public: return m_data + idx; } + /* // low - high -> decompose to a set of nodes. // Build Alias across the decomposed nodes. WIRSState<R>* get_query_state(void *query_parameters) { auto res = new WIRSState(); - K lower_key = ((wirs_query_parms *) query_parameters)->lower_bound; - K upper_key = ((wirs_query_parms *) query_parameters)->upper_bound; + K lower_key = ((wirs_query_parms<R> *) query_parameters)->lower_bound; + K upper_key = ((wirs_query_parms<R> *) query_parameters)->upper_bound; // Simulate a stack to unfold recursion. double tot_weight = 0.0; @@ -240,40 +251,7 @@ public: WIRSState<R> *s = (WIRSState<R> *) state; delete s; } - - // returns the number of records sampled - // NOTE: This operation returns records strictly between the lower and upper bounds, not - // including them. - size_t get_samples(void* shard_state, std::vector<R> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { - WIRSState<R> *state = (WIRSState<R> *) shard_state; - if (sample_sz == 0) { - return 0; - } - - // k -> sampling: three levels. 1. select a node -> select a fat point -> select a record. - size_t cnt = 0; - size_t attempts = 0; - do { - ++attempts; - // first level.... - auto node = state->nodes[state->top_level_alias->get(rng)]; - // second level... - auto fat_point = node->low + node->alias->get(rng); - // third level... - size_t rec_offset = fat_point * m_group_size + m_alias[fat_point]->get(rng); - auto record = m_data + rec_offset; - - // bounds rejection - if (lower_key > record->key || upper_key < record->key) { - continue; - } - - result_set.emplace_back(*record); - cnt++; - } while (attempts < sample_sz); - - return cnt; - } + */ size_t get_lower_bound(const K& key) const { size_t min = 0; @@ -294,6 +272,10 @@ public: } bool check_tombstone(const R& rec) { + if(!m_bf.lookup(rec.key)) { + return false; + } + m_ts_check_cnt++; size_t idx = get_lower_bound(rec.key); if (idx >= m_reccnt) { @@ -414,6 +396,7 @@ private: size_t m_group_size; size_t m_ts_check_cnt; size_t m_deleted_cnt; + BloomFilter<K> m_bf; // The number of rejections caused by tombstones // in this WIRS. |