diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-17 16:00:20 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-17 16:00:20 -0400 |
| commit | 6fd50506d2e50d2faf2478a2883a2ef1b4840a78 (patch) | |
| tree | eb29955b435a6e2e6196b9c89de15c6b8885c4f3 | |
| parent | 75a8418b580234521b5fa23340bee959c357acf9 (diff) | |
| download | dynamic-extension-6fd50506d2e50d2faf2478a2883a2ef1b4840a78.tar.gz | |
Started implementing shard interface (not finished yet)
| -rw-r--r-- | include/framework/DynamicExtension.h | 41 | ||||
| -rw-r--r-- | include/framework/InternalLevel.h | 33 | ||||
| -rw-r--r-- | include/framework/ShardInterface.h | 27 | ||||
| -rw-r--r-- | include/shard/WIRS.h | 53 | ||||
| -rw-r--r-- | tests/dynamic_extension_tests.cpp | 21 | ||||
| -rw-r--r-- | tests/internal_level_tests.cpp | 10 | ||||
| -rw-r--r-- | tests/wirs_tests.cpp | 5 |
7 files changed, 106 insertions, 84 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index fd42c5f..53b55b1 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -16,6 +16,9 @@ #include "framework/MutableBuffer.h" #include "framework/InternalLevel.h" +#include "framework/ShardInterface.h" + +#include "shard/WIRS.h" #include "ds/Alias.h" #include "util/timer.h" @@ -68,9 +71,8 @@ enum class DeletePolicy { typedef ssize_t level_index; -template <RecordInterface R> +template <RecordInterface R, ShardInterface S> class DynamicExtension { - typedef WIRS<R> Shard; typedef decltype(R::key) K; typedef decltype(R::value) V; typedef decltype(R::weight) W; @@ -143,7 +145,7 @@ public: // Get the shard weights for each level. Index 0 is the buffer, // represented by nullptr. - std::vector<std::pair<ShardID, Shard*>> shards; + std::vector<std::pair<ShardID, S*>> shards; std::vector<void*> states; shards.push_back({{-1, -1}, nullptr}); states.push_back(nullptr); @@ -151,13 +153,15 @@ public: std::vector<W> shard_weights; shard_weights.push_back((double) buffer_weight); + WIRS<R>::wirs_query_parms parms = {lower_key, upper_key}; + for (auto &level : m_levels) { - level->get_shard_weights(shard_weights, shards, states, lower_key, upper_key); + level->get_query_states(shard_weights, shards, states, &parms); } if (shard_weights.size() == 1 && shard_weights[0] == 0) { if (buffer_alias) delete buffer_alias; - for (auto& x: states) Shard::delete_state(x); + for (auto& x: states) S::delete_query_state(x); sampling_bailouts++; return; // no records in the sampling range } @@ -243,7 +247,7 @@ public: } while (sample_idx < sample_sz); if (buffer_alias) delete buffer_alias; - for (auto& x: states) Shard::delete_state(x); + for (auto& x: states) S::delete_query_state(x); enforce_rejection_rate_maximum(rng); } @@ -348,8 +352,8 @@ public: } - Shard *create_ssi() { - std::vector<Shard *> shards; + S *create_ssi() { + std::vector<S *> shards; if (m_levels.size() > 0) { for (int i=m_levels.size() - 1; i>= 0; i--) { @@ -359,9 +363,9 @@ public: } } - shards.emplace_back(new Shard(get_buffer(), nullptr)); + shards.emplace_back(new S(get_buffer(), nullptr)); - Shard *shards_array[shards.size()]; + S *shards_array[shards.size()]; size_t j = 0; for (size_t i=0; i<shards.size(); i++) { @@ -370,7 +374,7 @@ public: } } - Shard *flattened = new Shard(shards_array, j, nullptr); + S *flattened = new S(shards_array, j, nullptr); for (auto shard : shards) { delete shard; @@ -390,7 +394,7 @@ private: double m_max_delete_prop; double m_max_rejection_rate; - std::vector<InternalLevel<R> *> m_levels; + std::vector<InternalLevel<R, S> *> m_levels; level_index m_last_level_idx; @@ -446,7 +450,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel<R>(new_idx, new_shard_cnt)); + m_levels.emplace_back(new InternalLevel<R, S>(new_idx, new_shard_cnt)); m_last_level_idx++; return new_idx; @@ -526,14 +530,14 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<R>::merge_levels(m_levels[base_level], m_levels[incoming_level], rng); + m_levels[base_level] = InternalLevel<R, S>::merge_levels(m_levels[base_level], m_levels[incoming_level], rng); mark_as_unused(tmp); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level], rng); } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel<R>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); + m_levels[incoming_level] = new InternalLevel<R, S>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); } @@ -542,9 +546,9 @@ private: if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel<R>(0, 1); + auto temp_level = new InternalLevel<R, S>(0, 1); temp_level->append_mem_table(buffer, rng); - auto new_level = InternalLevel<R>::merge_levels(old_level, temp_level, rng); + auto new_level = InternalLevel<R, S>::merge_levels(old_level, temp_level, rng); m_levels[0] = new_level; delete temp_level; @@ -560,7 +564,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel<R> *level) { + inline void mark_as_unused(InternalLevel<R, S> *level) { delete level; } @@ -609,6 +613,7 @@ private: * no guarantees about which buffer will be accessed if level_index is -1. */ inline size_t get_level_record_count(level_index idx, MutableBuffer<R> *buffer=nullptr) { + assert(idx >= -1); if (idx == -1) { return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count(); diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index 19bfe9f..18b7de3 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -14,25 +14,26 @@ #include "util/types.h" #include "util/bf_config.h" -#include "shard/WIRS.h" +#include "framework/ShardInterface.h" +#include "framework/MutableBuffer.h" #include "ds/BloomFilter.h" namespace de { -template <RecordInterface R> +template <RecordInterface R, ShardInterface S> class InternalLevel { static const size_t REJECTION_TRIGGER_THRESHOLD = 1024; typedef decltype(R::key) K; typedef decltype(R::value) V; - typedef WIRS<R> Shard; + //typedef WIRS<R> S; private: struct InternalLevelStructure { InternalLevelStructure(size_t cap) : m_cap(cap) - , m_shards(new Shard*[cap]{nullptr}) + , m_shards(new S*[cap]{nullptr}) , m_bfs(new BloomFilter*[cap]{nullptr}) {} ~InternalLevelStructure() { @@ -46,7 +47,7 @@ private: } size_t m_cap; - Shard** m_shards; + S** m_shards; BloomFilter** m_bfs; }; @@ -75,49 +76,49 @@ public: new BloomFilter(BF_FPR, new_level->get_tombstone_count() + base_level->get_tombstone_count(), BF_HASH_FUNCS, rng); - Shard* shards[2]; + S* shards[2]; shards[0] = base_level->m_structure->m_shards[0]; shards[1] = new_level->m_structure->m_shards[0]; - res->m_structure->m_shards[0] = new Shard(shards, 2, res->m_structure->m_bfs[0]); + res->m_structure->m_shards[0] = new S(shards, 2, res->m_structure->m_bfs[0]); return res; } void append_mem_table(MutableBuffer<R>* buffer, const gsl_rng* rng) { assert(m_shard_cnt < m_structure->m_cap); m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng); - m_structure->m_shards[m_shard_cnt] = new Shard(buffer, m_structure->m_bfs[m_shard_cnt]); + m_structure->m_shards[m_shard_cnt] = new S(buffer, m_structure->m_bfs[m_shard_cnt]); ++m_shard_cnt; } void append_merged_shards(InternalLevel* level, const gsl_rng* rng) { assert(m_shard_cnt < m_structure->m_cap); m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng); - m_structure->m_shards[m_shard_cnt] = new Shard(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt]); + m_structure->m_shards[m_shard_cnt] = new S(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt]); ++m_shard_cnt; } - Shard *get_merged_shard() { - Shard *shards[m_shard_cnt]; + S *get_merged_shard() { + S *shards[m_shard_cnt]; for (size_t i=0; i<m_shard_cnt; i++) { shards[i] = (m_structure->m_shards[i]) ? m_structure->m_shards[i] : nullptr; } - return new Shard(shards, m_shard_cnt, nullptr); + return new S(shards, m_shard_cnt, nullptr); } // Append the sample range in-order..... - void get_shard_weights(std::vector<uint64_t>& weights, std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, const K& low, const K& high) { + void get_query_states(std::vector<uint64_t>& weights, std::vector<std::pair<ShardID, S *>> &shards, std::vector<void*>& shard_states, void *query_parms) { for (size_t i=0; i<m_shard_cnt; i++) { if (m_structure->m_shards[i]) { - auto shard_state = m_structure->m_shards[i]->get_sample_shard_state(low, high); + auto shard_state = m_structure->m_shards[i]->get_query_state(query_parms); if (shard_state->tot_weight > 0) { shards.push_back({{m_level_no, (ssize_t) i}, m_structure->m_shards[i]}); weights.push_back(shard_state->tot_weight); shard_states.emplace_back(shard_state); } else { - Shard::delete_state(shard_state); + S::delete_state(shard_state); } } } @@ -156,7 +157,7 @@ public: return m_structure->m_shards[shard_no]->get_record_at(idx); } - Shard* get_shard(size_t idx) { + S* get_shard(size_t idx) { return m_structure->m_shards[idx]; } diff --git a/include/framework/ShardInterface.h b/include/framework/ShardInterface.h new file mode 100644 index 0000000..5d07a99 --- /dev/null +++ b/include/framework/ShardInterface.h @@ -0,0 +1,27 @@ +/* + * include/shard/ShardInterface.h + * + * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include <concepts> + +#include "util/types.h" + +template <typename S> +concept ShardInterface = requires(S s, void *p) { + //s.point_lookup(); + //s.tombstone_lookup(); + //s.delete_record(); + + {s.get_query_state(p)} -> std::convertible_to<void*>; + {s.delete_query_state(p)}; + + {s.get_record_count()} -> std::convertible_to<size_t>; + {s.get_tombstone_count()} -> std::convertible_to<size_t>; + {s.get_memory_usage()} -> std::convertible_to<size_t>; +}; diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h index 2572caf..9f37396 100644 --- a/include/shard/WIRS.h +++ b/include/shard/WIRS.h @@ -25,6 +25,7 @@ namespace de { thread_local size_t wirs_cancelations = 0; + template <WeightedRecordInterface R> class WIRS { private: @@ -53,6 +54,11 @@ private: }; public: + struct wirs_query_parms { + K lower_bound; + K upper_bound; + }; + WIRS(MutableBuffer<R>* buffer, BloomFilter* bf) : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0), m_root(nullptr) { @@ -178,14 +184,6 @@ public: return false; } - void free_tree(struct wirs_node<R>* node) { - if (node) { - delete node->alias; - free_tree(node->left); - free_tree(node->right); - delete node; - } - } R* sorted_output() const { return m_data; @@ -206,8 +204,10 @@ public: // low - high -> decompose to a set of nodes. // Build Alias across the decomposed nodes. - WIRSState<R>* get_sample_shard_state(const K& lower_key, const K& upper_key) { + WIRSState<R>* get_query_state(void *query_parameters) { auto res = new WIRSState(); + K lower_key = ((wirs_query_parms *) query_parameters)->lower_bound; + K upper_key = ((wirs_query_parms *) query_parameters)->upper_bound; // Simulate a stack to unfold recursion. double tot_weight = 0.0; @@ -236,7 +236,7 @@ public: return res; } - static void delete_state(void *state) { + static void delete_query_state(void *state) { WIRSState<R> *s = (WIRSState<R> *) state; delete s; } @@ -293,26 +293,6 @@ public: return min; } - /* - bool check_delete(K key, V val) { - size_t idx = get_lower_bound(key); - if (idx >= m_reccnt) { - return false; - } - - auto ptr = m_data + get_lower_bound(key); - - while (ptr < m_data + m_reccnt && *ptr < R {key, val}) { - ptr ++; - } - - bool result = (m_tagging) ? ptr->is_deleted() - : *ptr == R {key, val} && ptr->is_tombstone(); - m_rejection_cnt += result; - return result; - } - */ - bool check_tombstone(const R& rec) { m_ts_check_cnt++; size_t idx = get_lower_bound(rec.key); @@ -332,12 +312,10 @@ public: return result; } - - size_t get_memory_utilization() { + size_t get_memory_usage() { return 0; } - size_t get_rejection_count() { return m_rejection_cnt; } @@ -418,6 +396,15 @@ private: m_root = construct_wirs_node(weights, 0, n_groups-1); } + void free_tree(struct wirs_node<R>* node) { + if (node) { + delete node->alias; + free_tree(node->left); + free_tree(node->right); + delete node; + } + } + R* m_data; std::vector<Alias *> m_alias; wirs_node<R>* m_root; diff --git a/tests/dynamic_extension_tests.cpp b/tests/dynamic_extension_tests.cpp index 3638b76..422e904 100644 --- a/tests/dynamic_extension_tests.cpp +++ b/tests/dynamic_extension_tests.cpp @@ -15,13 +15,14 @@ #include "testing.h" #include "framework/DynamicExtension.h" +#include "shard/WIRS.h" #include <check.h> using namespace de; START_TEST(t_create) { - auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, 1, 1, g_rng); ck_assert_ptr_nonnull(ext_wirs); @@ -35,7 +36,7 @@ END_TEST START_TEST(t_append) { - auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, 1, 1, g_rng); uint64_t key = 0; uint32_t val = 0; @@ -56,7 +57,7 @@ END_TEST START_TEST(t_append_with_mem_merges) { - auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, 1, 1, g_rng); uint64_t key = 0; uint32_t val = 0; @@ -77,7 +78,7 @@ END_TEST START_TEST(t_range_sample_memtable) { - auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, 1, 1, g_rng); uint64_t key = 0; uint32_t val = 0; @@ -112,7 +113,7 @@ END_TEST START_TEST(t_range_sample_memlevels) { - auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, 1, 1, g_rng); uint64_t key = 0; uint32_t val = 0; @@ -146,7 +147,7 @@ END_TEST START_TEST(t_range_sample_weighted) { - auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, 1, 1, g_rng); size_t n = 10000; std::vector<uint64_t> keys; @@ -217,7 +218,7 @@ END_TEST START_TEST(t_tombstone_merging_01) { size_t reccnt = 100000; - auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, .01, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, .01, 1, g_rng); std::set<std::pair<uint64_t, uint32_t>> records; std::set<std::pair<uint64_t, uint32_t>> to_delete; @@ -265,8 +266,8 @@ START_TEST(t_tombstone_merging_01) } END_TEST -DynamicExtension<WRec> *create_test_tree(size_t reccnt, size_t memlevel_cnt) { - auto ext_wirs = new DynamicExtension<WRec>(1000, 1000, 2, 1, 1, g_rng); +DynamicExtension<WRec, WIRS<WRec>> *create_test_tree(size_t reccnt, size_t memlevel_cnt) { + auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(1000, 1000, 2, 1, 1, g_rng); std::set<WRec> records; std::set<WRec> to_delete; @@ -309,7 +310,7 @@ DynamicExtension<WRec> *create_test_tree(size_t reccnt, size_t memlevel_cnt) { START_TEST(t_sorted_array) { size_t reccnt = 100000; - auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, 1, 1, g_rng); std::set<std::pair<uint64_t, uint32_t>> records; std::set<std::pair<uint64_t, uint32_t>> to_delete; diff --git a/tests/internal_level_tests.cpp b/tests/internal_level_tests.cpp index 74d29f0..6803112 100644 --- a/tests/internal_level_tests.cpp +++ b/tests/internal_level_tests.cpp @@ -23,16 +23,16 @@ START_TEST(t_memlevel_merge) auto tbl1 = create_test_mbuffer<WRec>(100); auto tbl2 = create_test_mbuffer<WRec>(100); - auto base_level = new InternalLevel<WRec>(1, 1); + auto base_level = new InternalLevel<WRec, WIRS<WRec>>(1, 1); base_level->append_mem_table(tbl1, g_rng); ck_assert_int_eq(base_level->get_record_cnt(), 100); - auto merging_level = new InternalLevel<WRec>(0, 1); + auto merging_level = new InternalLevel<WRec, WIRS<WRec>>(0, 1); merging_level->append_mem_table(tbl2, g_rng); ck_assert_int_eq(merging_level->get_record_cnt(), 100); auto old_level = base_level; - base_level = InternalLevel<WRec>::merge_levels(old_level, merging_level, g_rng); + base_level = InternalLevel<WRec, WIRS<WRec>>::merge_levels(old_level, merging_level, g_rng); delete old_level; delete merging_level; @@ -44,11 +44,11 @@ START_TEST(t_memlevel_merge) } -InternalLevel<WRec> *create_test_memlevel(size_t reccnt) { +InternalLevel<WRec, WIRS<WRec>> *create_test_memlevel(size_t reccnt) { auto tbl1 = create_test_mbuffer<WRec>(reccnt/2); auto tbl2 = create_test_mbuffer<WRec>(reccnt/2); - auto base_level = new InternalLevel<WRec>(1, 2); + auto base_level = new InternalLevel<WRec, WIRS<WRec>>(1, 2); base_level->append_mem_table(tbl1, g_rng); base_level->append_mem_table(tbl2, g_rng); diff --git a/tests/wirs_tests.cpp b/tests/wirs_tests.cpp index 828362d..c7edec9 100644 --- a/tests/wirs_tests.cpp +++ b/tests/wirs_tests.cpp @@ -185,7 +185,8 @@ START_TEST(t_weighted_sampling) results.reserve(k); size_t cnt[3] = {0}; for (size_t i=0; i<1000; i++) { - auto state = shard->get_sample_shard_state(lower_key, upper_key); + WIRS<WRec>::wirs_query_parms parms = {lower_key, upper_key}; + auto state = shard->get_query_state(&parms); shard->get_samples(state, results, lower_key, upper_key, k, g_rng); @@ -193,7 +194,7 @@ START_TEST(t_weighted_sampling) cnt[results[j].key - 1]++; } - WIRS<WRec>::delete_state(state); + WIRS<WRec>::delete_query_state(state); } ck_assert(roughly_equal(cnt[0] / 1000, (double) k/4.0, k, .05)); |