From c867d59aaf61bb1a0839d34b56935f52c0fc6e00 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 9 May 2023 15:13:35 -0400 Subject: Dynamic Extension unit tests + bugfixes --- include/framework/DynamicExtension.h | 30 +++---- include/framework/InternalLevel.h | 146 +++++++++++++++++------------------ include/shard/WIRS.h | 21 ++--- 3 files changed, 100 insertions(+), 97 deletions(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 93de24f..ed01e47 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include "framework/MutableBuffer.h" #include "framework/InternalLevel.h" @@ -42,7 +42,7 @@ static constexpr bool LSM_REJ_SAMPLE = false; // True for leveling, false for tiering static constexpr bool LSM_LEVELING = false; -static constexpr bool DELETE_TAGGING = true; +static constexpr bool DELETE_TAGGING = false; // TODO: Replace the constexpr bools above // with template parameters based on these @@ -107,7 +107,7 @@ public: int append(const K& key, const V& val, W weight, bool tombstone, gsl_rng *rng) { // NOTE: single-threaded implementation only MutableBuffer *buffer; - while (!(buffer = buffer())) + while (!(buffer = get_buffer())) ; if (buffer->is_full()) { @@ -138,8 +138,8 @@ public: shards.push_back({{-1, -1}, nullptr}); states.push_back(nullptr); - std::vector shard_weights; - shard_weights.push_back(buffer_weight); + std::vector shard_weights; + shard_weights.push_back((double) buffer_weight); for (auto &level : m_levels) { level->get_shard_weights(shard_weights, shards, states, lower_key, upper_key); @@ -153,10 +153,13 @@ public: } double tot_weight = std::accumulate(shard_weights.begin(), shard_weights.end(), 0); - for (auto& w: shard_weights) w /= tot_weight; + std::vector normalized_weights(shard_weights.size()); + for (size_t i=0; i shard_samples(shard_weights.size(), 0); @@ -214,13 +217,13 @@ public: shards[i].second->get_samples(states[i], results, lower_key, upper_key, shard_samples[i], rng); - for (size_t i=0; iis_tombstone() || is_deleted(results[i], shards[i].first, buffer, buffer_cutoff)) { + for (size_t j=0; jkey < lower_bound || record->key > upper_bound) { bounds_rejections++; return true; - } else if (is_deleted(record, shid, buffer, buffer, buffer_cutoff)) { + } else if (is_deleted(record, shid, buffer, buffer_cutoff)) { deletion_rejections++; return true; } @@ -431,8 +434,7 @@ private: size_t new_shard_cnt = (LSM_LEVELING) ? 1 : m_scale_factor; new_idx = m_levels.size(); if (new_idx > 0) { - auto res = m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count(); - assert(res == 0); + assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } m_levels.emplace_back(new InternalLevel(new_idx, new_shard_cnt, DELETE_TAGGING)); @@ -453,7 +455,7 @@ private: merge_buffer_into_l0(buffer, rng); enforce_delete_maximum(0, rng); - buffer->tshardcate(); + buffer->truncate(); return; } diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index 5134584..727a382 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -28,37 +28,37 @@ private: struct InternalLevelStructure { InternalLevelStructure(size_t cap) : m_cap(cap) - , m_runs(new WIRS*[cap]{nullptr}) + , m_shards(new WIRS*[cap]{nullptr}) , m_bfs(new BloomFilter*[cap]{nullptr}) {} ~InternalLevelStructure() { for (size_t i = 0; i < m_cap; ++i) { - if (m_runs[i]) delete m_runs[i]; + if (m_shards[i]) delete m_shards[i]; if (m_bfs[i]) delete m_bfs[i]; } - delete[] m_runs; + delete[] m_shards; delete[] m_bfs; } size_t m_cap; - WIRS** m_runs; + WIRS** m_shards; BloomFilter** m_bfs; }; public: - InternalLevel(ssize_t level_no, size_t run_cap, bool tagging) - : m_level_no(level_no), m_run_cnt(0) - , m_structure(new InternalLevelStructure(run_cap)) + InternalLevel(ssize_t level_no, size_t shard_cap, bool tagging) + : m_level_no(level_no), m_shard_cnt(0) + , m_structure(new InternalLevelStructure(shard_cap)) , m_tagging(tagging) {} - // Create a new memory level sharing the runs and repurposing it as previous level_no + 1 + // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 // WARNING: for leveling only. InternalLevel(InternalLevel* level, bool tagging) - : m_level_no(level->m_level_no + 1), m_run_cnt(level->m_run_cnt) + : m_level_no(level->m_level_no + 1), m_shard_cnt(level->m_shard_cnt) , m_structure(level->m_structure) , m_tagging(tagging) { - assert(m_structure->m_cap == 1 && m_run_cnt == 1); + assert(m_structure->m_cap == 1 && m_shard_cnt == 1); } @@ -69,73 +69,73 @@ public: static InternalLevel* merge_levels(InternalLevel* base_level, InternalLevel* new_level, bool tagging, const gsl_rng* rng) { assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); auto res = new InternalLevel(base_level->m_level_no, 1, tagging); - res->m_run_cnt = 1; + res->m_shard_cnt = 1; res->m_structure->m_bfs[0] = new BloomFilter(BF_FPR, new_level->get_tombstone_count() + base_level->get_tombstone_count(), BF_HASH_FUNCS, rng); - WIRS* runs[2]; - runs[0] = base_level->m_structure->m_runs[0]; - runs[1] = new_level->m_structure->m_runs[0]; + WIRS* shards[2]; + shards[0] = base_level->m_structure->m_shards[0]; + shards[1] = new_level->m_structure->m_shards[0]; - res->m_structure->m_runs[0] = new WIRS(runs, 2, res->m_structure->m_bfs[0], tagging); + res->m_structure->m_shards[0] = new WIRS(shards, 2, res->m_structure->m_bfs[0], tagging); return res; } void append_mem_table(MutableBuffer* buffer, const gsl_rng* rng) { - assert(m_run_cnt < m_structure->m_cap); - m_structure->m_bfs[m_run_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng); - m_structure->m_runs[m_run_cnt] = new WIRS(buffer, m_structure->m_bfs[m_run_cnt], m_tagging); - ++m_run_cnt; + assert(m_shard_cnt < m_structure->m_cap); + m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng); + m_structure->m_shards[m_shard_cnt] = new WIRS(buffer, m_structure->m_bfs[m_shard_cnt], m_tagging); + ++m_shard_cnt; } - void append_merged_runs(InternalLevel* level, const gsl_rng* rng) { - assert(m_run_cnt < m_structure->m_cap); - m_structure->m_bfs[m_run_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng); - m_structure->m_runs[m_run_cnt] = new WIRS(level->m_structure->m_runs, level->m_run_cnt, m_structure->m_bfs[m_run_cnt], m_tagging); - ++m_run_cnt; + void append_merged_shards(InternalLevel* level, const gsl_rng* rng) { + assert(m_shard_cnt < m_structure->m_cap); + m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng); + m_structure->m_shards[m_shard_cnt] = new WIRS(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt], m_tagging); + ++m_shard_cnt; } - WIRS *get_merged_run() { - WIRS *runs[m_run_cnt]; + WIRS *get_merged_shard() { + WIRS *shards[m_shard_cnt]; - for (size_t i=0; im_runs[i]) ? m_structure->m_runs[i] : nullptr; + for (size_t i=0; im_shards[i]) ? m_structure->m_shards[i] : nullptr; } - return new WIRS(runs, m_run_cnt, nullptr, m_tagging); + return new WIRS(shards, m_shard_cnt, nullptr, m_tagging); } // Append the sample range in-order..... - void get_run_weights(std::vector& weights, std::vector *>> &runs, std::vector& run_states, const K& low, const K& high) { - for (size_t i=0; im_runs[i]) { - auto run_state = m_structure->m_runs[i]->get_sample_run_state(low, high); - if (run_state->tot_weight > 0) { - runs.push_back({{m_level_no, (ssize_t) i}, m_structure->m_runs[i]}); - weights.push_back(run_state->tot_weight); - run_states.emplace_back(run_state); + void get_shard_weights(std::vector& weights, std::vector *>> &shards, std::vector& shard_states, const K& low, const K& high) { + for (size_t i=0; im_shards[i]) { + auto shard_state = m_structure->m_shards[i]->get_sample_shard_state(low, high); + if (shard_state->tot_weight > 0) { + shards.push_back({{m_level_no, (ssize_t) i}, m_structure->m_shards[i]}); + weights.push_back(shard_state->tot_weight); + shard_states.emplace_back(shard_state); } else { - delete run_state; + WIRS::delete_state(shard_state); } } } } - bool bf_rejection_check(size_t run_stop, const K& key) { - for (size_t i = 0; i < run_stop; ++i) { + bool bf_rejection_check(size_t shard_stop, const K& key) { + for (size_t i = 0; i < shard_stop; ++i) { if (m_structure->m_bfs[i] && m_structure->m_bfs[i]->lookup(key)) return true; } return false; } - bool check_tombstone(size_t run_stop, const K& key, const V& val) { - if (m_run_cnt == 0) return false; + bool check_tombstone(size_t shard_stop, const K& key, const V& val) { + if (m_shard_cnt == 0) return false; - for (int i = m_run_cnt - 1; i >= (ssize_t) run_stop; i--) { - if (m_structure->m_runs[i] && (m_structure->m_bfs[i]->lookup(key)) - && m_structure->m_runs[i]->check_tombstone(key, val)) + for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) { + if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(key)) + && m_structure->m_shards[i]->check_tombstone(key, val)) return true; } return false; @@ -143,7 +143,7 @@ public: bool delete_record(const K& key, const V& val) { for (size_t i = 0; i < m_structure->m_cap; ++i) { - if (m_structure->m_runs[i] && m_structure->m_runs[i]->delete_record(key, val)) { + if (m_structure->m_shards[i] && m_structure->m_shards[i]->delete_record(key, val)) { return true; } } @@ -151,22 +151,22 @@ public: return false; } - const Record* get_record_at(size_t run_no, size_t idx) { - return m_structure->m_runs[run_no]->get_record_at(idx); + const Record* get_record_at(size_t shard_no, size_t idx) { + return m_structure->m_shards[shard_no]->get_record_at(idx); } - WIRS* get_run(size_t idx) { - return m_structure->m_runs[idx]; + WIRS* get_shard(size_t idx) { + return m_structure->m_shards[idx]; } - size_t get_run_count() { - return m_run_cnt; + size_t get_shard_count() { + return m_shard_cnt; } size_t get_record_cnt() { size_t cnt = 0; - for (size_t i=0; im_runs[i]->get_record_count(); + for (size_t i=0; im_shards[i]->get_record_count(); } return cnt; @@ -174,15 +174,15 @@ public: size_t get_tombstone_count() { size_t res = 0; - for (size_t i = 0; i < m_run_cnt; ++i) { - res += m_structure->m_runs[i]->get_tombstone_count(); + for (size_t i = 0; i < m_shard_cnt; ++i) { + res += m_structure->m_shards[i]->get_tombstone_count(); } return res; } size_t get_aux_memory_utilization() { size_t cnt = 0; - for (size_t i=0; im_bfs[i]) { cnt += m_structure->m_bfs[i]->get_memory_utilization(); } @@ -193,9 +193,9 @@ public: size_t get_memory_utilization() { size_t cnt = 0; - for (size_t i=0; im_runs[i]) { - cnt += m_structure->m_runs[i]->get_memory_utilization(); + for (size_t i=0; im_shards[i]) { + cnt += m_structure->m_shards[i]->get_memory_utilization(); } } @@ -205,10 +205,10 @@ public: double get_tombstone_prop() { size_t tscnt = 0; size_t reccnt = 0; - for (size_t i=0; im_runs[i]) { - tscnt += m_structure->m_runs[i]->get_tombstone_count(); - reccnt += m_structure->m_runs[i]->get_record_count(); + for (size_t i=0; im_shards[i]) { + tscnt += m_structure->m_shards[i]->get_tombstone_count(); + reccnt += m_structure->m_shards[i]->get_record_count(); } } @@ -217,9 +217,9 @@ public: size_t get_rejection_count() { size_t rej_cnt = 0; - for (size_t i=0; im_runs[i]) { - rej_cnt += m_structure->m_runs[i]->get_rejection_count(); + for (size_t i=0; im_shards[i]) { + rej_cnt += m_structure->m_shards[i]->get_rejection_count(); } } @@ -229,10 +229,10 @@ public: double get_rejection_rate() { size_t rej_cnt = 0; size_t attempt_cnt = 0; - for (size_t i=0; im_runs[i]) { - attempt_cnt += m_structure->m_runs[i]->get_ts_check_count(); - rej_cnt += m_structure->m_runs[i]->get_rejection_count(); + for (size_t i=0; im_shards[i]) { + attempt_cnt += m_structure->m_shards[i]->get_ts_check_count(); + rej_cnt += m_structure->m_shards[i]->get_rejection_count(); } } @@ -248,8 +248,8 @@ public: private: ssize_t m_level_no; - size_t m_run_cnt; - size_t m_run_size_cap; + size_t m_shard_cnt; + size_t m_shard_size_cap; bool m_tagging; std::shared_ptr m_structure; }; diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h index c89a407..39337bf 100644 --- a/include/shard/WIRS.h +++ b/include/shard/WIRS.h @@ -22,7 +22,7 @@ namespace de { -thread_local size_t m_wirsrun_cancelations = 0; +thread_local size_t wirs_cancelations = 0; template class WIRS { @@ -63,7 +63,7 @@ public: if (!(base->is_tombstone()) && (base + 1) < stop) { if (base->match(base + 1) && (base + 1)->is_tombstone()) { base += 2; - m_wirsrun_cancelations++; + wirs_cancelations++; continue; } } @@ -89,7 +89,7 @@ public: } } - WIRS(WIRS** runs, size_t len, BloomFilter* bf, bool tagging) + WIRS(WIRS** shards, size_t len, BloomFilter* bf, bool tagging) : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0), m_tagging(tagging), m_root(nullptr) { std::vector> cursors; @@ -100,10 +100,10 @@ public: size_t attemp_reccnt = 0; for (size_t i = 0; i < len; ++i) { - if (runs[i]) { - auto base = runs[i]->sorted_output(); - cursors.emplace_back(Cursor{base, base + runs[i]->get_record_count(), 0, runs[i]->get_record_count()}); - attemp_reccnt += runs[i]->get_record_count(); + if (shards[i]) { + auto base = shards[i]->sorted_output(); + cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); + attemp_reccnt += shards[i]->get_record_count(); pq.push(cursors[i].ptr, i); } else { cursors.emplace_back(Cursor{nullptr, nullptr, 0, 0}); @@ -200,7 +200,7 @@ public: // low - high -> decompose to a set of nodes. // Build Alias across the decomposed nodes. - WIRSState* get_sample_run_state(const K& lower_key, const K& upper_key) { + WIRSState* get_sample_shard_state(const K& lower_key, const K& upper_key) { WIRSState* res = new WIRSState(); // Simulate a stack to unfold recursion. @@ -238,7 +238,8 @@ public: // returns the number of records sampled // NOTE: This operation returns records strictly between the lower and upper bounds, not // including them. - size_t get_samples(WIRSState* run_state, std::vector> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { + size_t get_samples(void* shard_state, std::vector> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { + WIRSState *state = (WIRSState *) shard_state; if (sample_sz == 0) { return 0; } @@ -249,7 +250,7 @@ public: do { ++attempts; // first level.... - auto node = run_state->nodes[run_state->top_level_alias->get(rng)]; + auto node = state->nodes[state->top_level_alias->get(rng)]; // second level... auto fat_point = node->low + node->alias->get(rng); // third level... -- cgit v1.2.3