From 08d6c84b9d69b500c964a8ff66e726e1f01f2095 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 22 May 2023 13:25:20 -0400 Subject: Progress towards generalization of shard interface --- include/framework/InternalLevel.h | 162 ++++++++++---------------------------- 1 file changed, 40 insertions(+), 122 deletions(-) (limited to 'include/framework/InternalLevel.h') diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index 18b7de3..70da821 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -16,85 +16,52 @@ #include "util/bf_config.h" #include "framework/ShardInterface.h" #include "framework/MutableBuffer.h" +#include "framework/QueryInterface.h" #include "ds/BloomFilter.h" namespace de { -template +template class InternalLevel { - static const size_t REJECTION_TRIGGER_THRESHOLD = 1024; - - typedef decltype(R::key) K; - typedef decltype(R::value) V; - //typedef WIRS S; - -private: - struct InternalLevelStructure { - InternalLevelStructure(size_t cap) - : m_cap(cap) - , m_shards(new S*[cap]{nullptr}) - , m_bfs(new BloomFilter*[cap]{nullptr}) {} - - ~InternalLevelStructure() { - for (size_t i = 0; i < m_cap; ++i) { - if (m_shards[i]) delete m_shards[i]; - if (m_bfs[i]) delete m_bfs[i]; - } - - delete[] m_shards; - delete[] m_bfs; - } - - size_t m_cap; - S** m_shards; - BloomFilter** m_bfs; - }; - public: InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no), m_shard_cnt(0) - , m_structure(new InternalLevelStructure(shard_cap)) {} + : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector(shard_cap, nullptr)) + {} // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 // WARNING: for leveling only. InternalLevel(InternalLevel* level) : m_level_no(level->m_level_no + 1), m_shard_cnt(level->m_shard_cnt) - , m_structure(level->m_structure) { - assert(m_structure->m_cap == 1 && m_shard_cnt == 1); + , m_shards(level->m_shards) { + assert(m_shard_cnt == 1 && m_shards.size() == 1); } ~InternalLevel() {} // WARNING: for leveling only. // assuming the base level is the level new level is merging into. (base_level is larger.) - static InternalLevel* merge_levels(InternalLevel* base_level, InternalLevel* new_level, const gsl_rng* rng) { + static InternalLevel* merge_levels(InternalLevel* base_level, InternalLevel* new_level) { assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); auto res = new InternalLevel(base_level->m_level_no, 1); res->m_shard_cnt = 1; - res->m_structure->m_bfs[0] = - new BloomFilter(BF_FPR, - new_level->get_tombstone_count() + base_level->get_tombstone_count(), - BF_HASH_FUNCS, rng); S* shards[2]; - shards[0] = base_level->m_structure->m_shards[0]; - shards[1] = new_level->m_structure->m_shards[0]; + shards[0] = base_level->m_shards[0]; + shards[1] = new_level->m_shards[0]; - res->m_structure->m_shards[0] = new S(shards, 2, res->m_structure->m_bfs[0]); + res->m_shards[0] = new S(shards, 2); return res; } - void append_mem_table(MutableBuffer* buffer, const gsl_rng* rng) { - assert(m_shard_cnt < m_structure->m_cap); - m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng); - m_structure->m_shards[m_shard_cnt] = new S(buffer, m_structure->m_bfs[m_shard_cnt]); + void append_buffer(MutableBuffer* buffer) { + assert(m_shard_cnt < m_shards.size()); + m_shards[m_shard_cnt] = new S(buffer); ++m_shard_cnt; } - void append_merged_shards(InternalLevel* level, const gsl_rng* rng) { - assert(m_shard_cnt < m_structure->m_cap); - m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng); - m_structure->m_shards[m_shard_cnt] = new S(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt]); + void append_merged_shards(InternalLevel* level) { + assert(m_shard_cnt < m_shards.size()); + m_shards[m_shard_cnt] = new S(level->m_shards, level->m_shard_cnt); ++m_shard_cnt; } @@ -102,50 +69,36 @@ public: S *shards[m_shard_cnt]; for (size_t i=0; im_shards[i]) ? m_structure->m_shards[i] : nullptr; + shards[i] = m_shards[i]; } - return new S(shards, m_shard_cnt, nullptr); + return new S(shards, m_shard_cnt); } // Append the sample range in-order..... - void get_query_states(std::vector& weights, std::vector> &shards, std::vector& shard_states, void *query_parms) { + void get_query_states(std::vector> &shards, std::vector& shard_states, void *query_parms) { for (size_t i=0; im_shards[i]) { - auto shard_state = m_structure->m_shards[i]->get_query_state(query_parms); - if (shard_state->tot_weight > 0) { - shards.push_back({{m_level_no, (ssize_t) i}, m_structure->m_shards[i]}); - weights.push_back(shard_state->tot_weight); - shard_states.emplace_back(shard_state); - } else { - S::delete_state(shard_state); - } + if (m_shards[i]) { + auto shard_state = Q::get_query_state(m_shards[i], query_parms); + shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i]}); + shard_states.emplace_back(shard_state); } } } - bool bf_rejection_check(size_t shard_stop, const K& key) { - for (size_t i = 0; i < shard_stop; ++i) { - if (m_structure->m_bfs[i] && m_structure->m_bfs[i]->lookup(key)) - return true; - } - return false; - } - bool check_tombstone(size_t shard_stop, const R& rec) { if (m_shard_cnt == 0) return false; for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) { - if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(rec.key)) - && m_structure->m_shards[i]->check_tombstone(rec)) + if (m_shards[i] && m_shards[i]->check_tombstone(rec)) return true; } return false; } - bool delete_record(const K& key, const V& val) { - for (size_t i = 0; i < m_structure->m_cap; ++i) { - if (m_structure->m_shards[i] && m_structure->m_shards[i]->delete_record(key, val)) { + bool delete_record(const R &rec) { + for (size_t i = 0; i < m_shards.size(); ++i) { + if (m_shards[i] && m_shards[i]->delete_record(rec)) { return true; } } @@ -153,12 +106,8 @@ public: return false; } - const R* get_record_at(size_t shard_no, size_t idx) { - return m_structure->m_shards[shard_no]->get_record_at(idx); - } - S* get_shard(size_t idx) { - return m_structure->m_shards[idx]; + return m_shards[idx]; } size_t get_shard_count() { @@ -168,7 +117,7 @@ public: size_t get_record_cnt() { size_t cnt = 0; for (size_t i=0; im_shards[i]->get_record_count(); + cnt += m_shards[i]->get_record_count(); } return cnt; @@ -177,27 +126,25 @@ public: size_t get_tombstone_count() { size_t res = 0; for (size_t i = 0; i < m_shard_cnt; ++i) { - res += m_structure->m_shards[i]->get_tombstone_count(); + res += m_shards[i]->get_tombstone_count(); } return res; } - size_t get_aux_memory_utilization() { + size_t get_aux_memory_usage() { size_t cnt = 0; for (size_t i=0; im_bfs[i]) { - cnt += m_structure->m_bfs[i]->get_memory_utilization(); - } + cnt += m_shards[i]->get_aux_memory_usage(); } return cnt; } - size_t get_memory_utilization() { + size_t get_memory_usage() { size_t cnt = 0; for (size_t i=0; im_shards[i]) { - cnt += m_structure->m_shards[i]->get_memory_utilization(); + if (m_shards[i]) { + cnt += m_shards[i]->get_memory_usage(); } } @@ -208,51 +155,22 @@ public: size_t tscnt = 0; size_t reccnt = 0; for (size_t i=0; im_shards[i]) { - tscnt += m_structure->m_shards[i]->get_tombstone_count(); - reccnt += m_structure->m_shards[i]->get_record_count(); + if (m_shards[i]) { + tscnt += m_shards[i]->get_tombstone_count(); + reccnt += m_shards[i]->get_record_count(); } } return (double) tscnt / (double) (tscnt + reccnt); } - size_t get_rejection_count() { - size_t rej_cnt = 0; - for (size_t i=0; im_shards[i]) { - rej_cnt += m_structure->m_shards[i]->get_rejection_count(); - } - } - - return rej_cnt; - } - - double get_rejection_rate() { - size_t rej_cnt = 0; - size_t attempt_cnt = 0; - for (size_t i=0; im_shards[i]) { - attempt_cnt += m_structure->m_shards[i]->get_ts_check_count(); - rej_cnt += m_structure->m_shards[i]->get_rejection_count(); - } - } - - if (attempt_cnt == 0) return 0; - - // the rejection rate is considered 0 until we exceed an - // absolute threshold of rejections. - if (rej_cnt <= REJECTION_TRIGGER_THRESHOLD) return 0; - - return (double) rej_cnt / (double) attempt_cnt; - } - private: ssize_t m_level_no; size_t m_shard_cnt; size_t m_shard_size_cap; - std::shared_ptr m_structure; + + std::shared_ptr> m_shards; }; } -- cgit v1.2.3