From b00682429988f17152e7573ffeffa1cecfdd3d3a Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 29 May 2023 12:33:58 -0400 Subject: Tests and bugfixes for framework --- CMakeLists.txt | 6 +-- include/ds/BloomFilter.h | 2 +- include/framework/DynamicExtension.h | 30 ++++++------ include/framework/InternalLevel.h | 54 +++++++++++----------- include/framework/MutableBuffer.h | 22 +++++---- include/framework/QueryInterface.h | 5 ++ include/framework/RecordInterface.h | 4 +- include/shard/WIRS.h | 39 +++++++++++++--- tests/dynamic_extension_tests.cpp | 88 +++++++++++++++++++++--------------- tests/internal_level_tests.cpp | 33 ++++++++------ tests/mutable_buffer_tests.cpp | 47 +++++++------------ tests/testing.h | 16 +++---- tests/wirs_tests.cpp | 78 +++++++++++++------------------- 13 files changed, 225 insertions(+), 199 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7be4085..0ceceda 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -49,9 +49,9 @@ if (tests) target_link_libraries(dynamic_extension_tests PUBLIC gsl check subunit pthread) target_include_directories(dynamic_extension_tests PRIVATE include) - add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp) - target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread) - target_include_directories(memisam_tests PRIVATE include) + # add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp) + #target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread) + #target_include_directories(memisam_tests PRIVATE include) endif() # Benchmark build instructions diff --git a/include/ds/BloomFilter.h b/include/ds/BloomFilter.h index d55a7af..3d218cd 100644 --- a/include/ds/BloomFilter.h +++ b/include/ds/BloomFilter.h @@ -14,7 +14,7 @@ #include "ds/BitArray.h" #include "util/hash.h" #include "util/base.h" -#include "util/Record.h" +#include "framework/RecordInterface.h" namespace de { diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index a70dda4..68f85e2 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -77,7 +77,7 @@ template > Shard; typedef S Shard; - typedef MutableBuffer Buffer; + typedef MutableBuffer> Buffer; public: DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) @@ -149,7 +149,7 @@ public: // Execute the query for each shard for (size_t i=0; i *> m_levels; + std::vector, S, Q> *> m_levels; Buffer *get_buffer() { return m_buffer; } - int internal_append(R &rec, bool ts) { + int internal_append(const R &rec, bool ts) { Buffer *buffer; while (!(buffer = get_buffer())) ; @@ -274,10 +274,12 @@ private: merge_buffer(); } - return buffer->append(rec, ts); + WrappedRecord wrec = static_cast>(rec); + + return buffer->append(wrec, ts); } - std::vector post_process(std::vector records, ShardID shid, Buffer *buffer) { + std::vector filter_deletes(std::vector records, ShardID shid, Buffer *buffer) { std::vector processed_records; processed_records.reserve(records.size()); @@ -335,7 +337,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel(new_idx, new_shard_cnt)); + m_levels.emplace_back(new InternalLevel, Shard, Q>(new_idx, new_shard_cnt)); return new_idx; } @@ -414,14 +416,14 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel::merge_levels(m_levels[base_level], m_levels[incoming_level]); + m_levels[base_level] = InternalLevel, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); mark_as_unused(tmp); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); + m_levels[incoming_level] = new InternalLevel, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); } @@ -430,9 +432,9 @@ private: if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel(0, 1); + auto temp_level = new InternalLevel, Shard, Q>(0, 1); temp_level->append_buffer(buffer); - auto new_level = InternalLevel::merge_levels(old_level, temp_level); + auto new_level = InternalLevel, Shard, Q>::merge_levels(old_level, temp_level); m_levels[0] = new_level; delete temp_level; @@ -448,7 +450,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel *level) { + inline void mark_as_unused(InternalLevel, Shard, Q> *level) { delete level; } diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index 6986a61..c489063 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -28,7 +28,7 @@ class InternalLevel { typedef MutableBuffer Buffer; public: InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector(shard_cap, nullptr)) + : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector(shard_cap, nullptr)) {} // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 @@ -36,7 +36,7 @@ public: InternalLevel(InternalLevel* level) : m_level_no(level->m_level_no + 1), m_shard_cnt(level->m_shard_cnt) , m_shards(level->m_shards) { - assert(m_shard_cnt == 1 && m_shards.size() == 1); + assert(m_shard_cnt == 1 && m_shards->size() == 1); } ~InternalLevel() {} @@ -48,22 +48,22 @@ public: auto res = new InternalLevel(base_level->m_level_no, 1); res->m_shard_cnt = 1; Shard* shards[2]; - shards[0] = base_level->m_shards[0]; - shards[1] = new_level->m_shards[0]; + shards[0] = (*base_level->m_shards)[0]; + shards[1] = (*new_level->m_shards)[0]; - res->m_shards[0] = new S(shards, 2); + (*res->m_shards)[0] = new S(shards, 2); return res; } void append_buffer(Buffer* buffer) { - assert(m_shard_cnt < m_shards.size()); - m_shards[m_shard_cnt] = new S(buffer); + assert(m_shard_cnt < m_shards->size()); + (*m_shards)[m_shard_cnt] = new S(buffer); ++m_shard_cnt; } void append_merged_shards(InternalLevel* level) { - assert(m_shard_cnt < m_shards.size()); - m_shards[m_shard_cnt] = new S(level->m_shards, level->m_shard_cnt); + assert(m_shard_cnt < m_shards->size()); + (*m_shards)[m_shard_cnt] = new S(level->m_shards->data(), level->m_shard_cnt); ++m_shard_cnt; } @@ -71,7 +71,7 @@ public: Shard *shards[m_shard_cnt]; for (size_t i=0; i> &shards, std::vector& shard_states, void *query_parms) { for (size_t i=0; i= (ssize_t) shard_stop; i--) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec, true); + if ((*m_shards)[i]) { + auto res = (*m_shards)[i]->point_lookup(rec, true); if (res && res->is_tombstone()) { return true; } @@ -105,9 +105,9 @@ public: bool delete_record(const R &rec) { if (m_shard_cnt == 0) return false; - for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec); + for (size_t i = 0; i < (*m_shards)->size(); ++i) { + if ((*m_shards)[i]) { + auto res = (*m_shards)[i]->point_lookup(rec); if (res) { res->set_delete(); } @@ -118,7 +118,7 @@ public: } Shard* get_shard(size_t idx) { - return m_shards[idx]; + return (*m_shards)[idx]; } size_t get_shard_count() { @@ -128,7 +128,7 @@ public: size_t get_record_cnt() { size_t cnt = 0; for (size_t i=0; iget_record_count(); + cnt += (*m_shards)[i]->get_record_count(); } return cnt; @@ -137,7 +137,7 @@ public: size_t get_tombstone_count() { size_t res = 0; for (size_t i = 0; i < m_shard_cnt; ++i) { - res += m_shards[i]->get_tombstone_count(); + res += (*m_shards)[i]->get_tombstone_count(); } return res; } @@ -145,7 +145,7 @@ public: size_t get_aux_memory_usage() { size_t cnt = 0; for (size_t i=0; iget_aux_memory_usage(); + cnt += (*m_shards)[i]->get_aux_memory_usage(); } return cnt; @@ -154,8 +154,8 @@ public: size_t get_memory_usage() { size_t cnt = 0; for (size_t i=0; iget_memory_usage(); + if ((*m_shards)[i]) { + cnt += (*m_shards)[i]->get_memory_usage(); } } @@ -166,9 +166,9 @@ public: size_t tscnt = 0; size_t reccnt = 0; for (size_t i=0; iget_tombstone_count(); - reccnt += m_shards[i]->get_record_count(); + if ((*m_shards)[i]) { + tscnt += (*m_shards)[i]->get_tombstone_count(); + reccnt += (*m_shards[i])->get_record_count(); } } diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index 3643a89..c154001 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -28,14 +28,14 @@ namespace de { template class MutableBuffer { - typedef WrappedRecord WRec; + //typedef WrappedRecord R; public: MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap) : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { - auto len = capacity * sizeof(WRec); + auto len = capacity * sizeof(R); size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE)); - m_data = (WRec*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); + m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); m_tombstone_filter = nullptr; if (max_tombstone_cap > 0) { m_tombstone_filter = new BloomFilter(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS); @@ -54,12 +54,10 @@ public: int32_t pos = 0; if ((pos = try_advance_tail()) == -1) return 0; - WRec wrec; - wrec.rec = rec; + R new_rec = rec; + if (tombstone) new_rec.set_tombstone(); - if (tombstone) wrec.set_tombstone(); - - m_data[pos] = wrec; + m_data[pos] = new_rec; m_data[pos].header |= (pos << 2); if (tombstone) { @@ -68,7 +66,7 @@ public: } if constexpr (WeightedRecordInterface) { - m_weight.fetch_add(rec.weight); + m_weight.fetch_add(new_rec.weight); double old = m_max_weight.load(); while (old < rec.weight) { m_max_weight.compare_exchange_strong(old, rec.weight); @@ -125,7 +123,7 @@ public: auto offset = 0; while (offset < m_reccnt.load()) { - if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) { + if (m_data[offset] == rec && m_data[offset].is_tombstone()) { return true; } offset++;; @@ -149,6 +147,10 @@ public: return m_weight.load(); } + R *get_data() { + return m_data; + } + private: int32_t try_advance_tail() { size_t new_tail = m_reccnt.fetch_add(1); diff --git a/include/framework/QueryInterface.h b/include/framework/QueryInterface.h index eafeeb0..886bdc8 100644 --- a/include/framework/QueryInterface.h +++ b/include/framework/QueryInterface.h @@ -13,10 +13,15 @@ template concept QueryInterface = requires(Q q, void *p) { + +/* {q.get_query_state(p, p)} -> std::convertible_to; {q.get_buffer_query_state(p, p)}; {q.query(p, p)}; {q.buffer_query(p, p)}; {q.merge()}; {q.delete_query_state(p)}; +*/ + + {Q::delete_query_state(std::declval())} -> std::same_as; }; diff --git a/include/framework/RecordInterface.h b/include/framework/RecordInterface.h index 8afd90a..c8e622f 100644 --- a/include/framework/RecordInterface.h +++ b/include/framework/RecordInterface.h @@ -26,8 +26,8 @@ concept RecordInterface = requires(R r, R s) { }; template -struct WrappedRecord { - R rec; +struct WrappedRecord : R { + //R rec; uint32_t header; inline void set_delete() { diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h index 4c7563f..5b610c7 100644 --- a/include/shard/WIRS.h +++ b/include/shard/WIRS.h @@ -23,6 +23,8 @@ #include "util/bf_config.h" #include "framework/MutableBuffer.h" #include "framework/RecordInterface.h" +#include "framework/ShardInterface.h" +#include "framework/QueryInterface.h" namespace de { @@ -34,8 +36,6 @@ struct wirs_query_parms { decltype(R::key) upper_bound; }; -class InternalLevel; - template class WIRSQuery; @@ -60,7 +60,6 @@ struct WIRSState { template class WIRS { - friend class InternalLevel; private: typedef decltype(R::key) K; @@ -82,9 +81,11 @@ public: size_t offset = 0; m_reccnt = 0; - auto base = buffer->sorted_output(); + auto base = buffer->get_data(); auto stop = base + buffer->get_record_count(); + std::sort(base, stop, memtable_record_cmp); + while (base < stop) { if (!(base->is_tombstone()) && (base + 1) < stop) { if (*base == *(base + 1) && (base + 1)->is_tombstone()) { @@ -186,7 +187,7 @@ public: } R *point_lookup(R &rec, bool filter=false) { - if (filter && !m_bf.lookup(rec.key)) { + if (filter && !m_bf->lookup(rec.key)) { return nullptr; } @@ -292,6 +293,29 @@ private: m_root = construct_wirs_node(weights, 0, n_groups-1); } + struct wirs_node* construct_wirs_node(const std::vector& weights, size_t low, size_t high) { + if (low == high) { + return new wirs_node{nullptr, nullptr, low, high, weights[low], new Alias({1.0})}; + } else if (low > high) return nullptr; + + std::vector node_weights; + W sum = 0; + for (size_t i = low; i < high; ++i) { + node_weights.emplace_back(weights[i]); + sum += weights[i]; + } + + for (auto& w: node_weights) + if (sum) w /= sum; + else w = 1.0 / node_weights.size(); + + + size_t mid = (low + high) / 2; + return new wirs_node{construct_wirs_node(weights, low, mid), + construct_wirs_node(weights, mid + 1, high), + low, high, sum, new Alias(node_weights)}; + } + void free_tree(struct wirs_node* node) { if (node) { delete node->alias; @@ -308,7 +332,7 @@ private: size_t m_reccnt; size_t m_tombstone_cnt; size_t m_group_size; - BloomFilter m_bf; + BloomFilter *m_bf; }; @@ -394,7 +418,8 @@ public: return output; } - static void delete_query_state(wirs_query_parms *parameters) { + static void delete_query_state(void *parm) { + wirs_query_parms *parameters = parm; delete parameters; } diff --git a/tests/dynamic_extension_tests.cpp b/tests/dynamic_extension_tests.cpp index 422e904..b74ab38 100644 --- a/tests/dynamic_extension_tests.cpp +++ b/tests/dynamic_extension_tests.cpp @@ -20,9 +20,11 @@ #include using namespace de; +typedef DynamicExtension>, WIRSQuery>> DE; + START_TEST(t_create) { - auto ext_wirs = new DynamicExtension>(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DE(100, 2, 1); ck_assert_ptr_nonnull(ext_wirs); @@ -34,15 +36,15 @@ START_TEST(t_create) END_TEST -START_TEST(t_append) +START_TEST(t_insert) { - auto ext_wirs = new DynamicExtension>(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DE(100, 2, 1); uint64_t key = 0; uint32_t val = 0; for (size_t i=0; i<100; i++) { WRec r = {key, val, 1}; - ck_assert_int_eq(ext_wirs->append(r, g_rng), 1); + ck_assert_int_eq(ext_wirs->insert(r), 1); key++; val++; } @@ -55,15 +57,15 @@ START_TEST(t_append) END_TEST -START_TEST(t_append_with_mem_merges) +START_TEST(t_insert_with_mem_merges) { - auto ext_wirs = new DynamicExtension>(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DE(100, 2, 1); uint64_t key = 0; uint32_t val = 0; for (size_t i=0; i<300; i++) { WRec r = {key, val, 1}; - ck_assert_int_eq(ext_wirs->append(r, g_rng), 1); + ck_assert_int_eq(ext_wirs->insert(r), 1); key++; val++; } @@ -76,15 +78,16 @@ START_TEST(t_append_with_mem_merges) END_TEST +/* START_TEST(t_range_sample_memtable) { - auto ext_wirs = new DynamicExtension>(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DE(100, 2, 1); uint64_t key = 0; uint32_t val = 0; for (size_t i=0; i<100; i++) { WRec r = {key, val, 1}; - ck_assert_int_eq(ext_wirs->append(r, g_rng), 1); + ck_assert_int_eq(ext_wirs->insert(r), 1); key++; val++; } @@ -96,7 +99,7 @@ START_TEST(t_range_sample_memtable) char *util_buf = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE); WRec sample_set[100]; - ext_wirs->range_sample(sample_set, lower_bound, upper_bound, 100, g_rng); + ext_wirs->range_sample(sample_set, lower_bound, upper_bound, 100); for(size_t i=0; i<100; i++) { ck_assert_int_le(sample_set[i].key, upper_bound); @@ -113,13 +116,13 @@ END_TEST START_TEST(t_range_sample_memlevels) { - auto ext_wirs = new DynamicExtension>(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DE(100, 2, 1); uint64_t key = 0; uint32_t val = 0; for (size_t i=0; i<300; i++) { WRec r = {key, val, 1}; - ck_assert_int_eq(ext_wirs->append(r, g_rng), 1); + ck_assert_int_eq(ext_wirs->insert(r), 1); key++; val++; } @@ -131,7 +134,7 @@ START_TEST(t_range_sample_memlevels) char *util_buf = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE); WRec sample_set[100]; - ext_wirs->range_sample(sample_set, lower_bound, upper_bound, 100, g_rng); + ext_wirs->range_sample(sample_set, lower_bound, upper_bound, 100); for(size_t i=0; i<100; i++) { ck_assert_int_le(sample_set[i].key, upper_bound); @@ -147,7 +150,7 @@ END_TEST START_TEST(t_range_sample_weighted) { - auto ext_wirs = new DynamicExtension>(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DE(100, 2, 1); size_t n = 10000; std::vector keys; @@ -184,7 +187,7 @@ START_TEST(t_range_sample_weighted) } WRec r = {keys[i], (uint32_t) i, weight, 0}; - ext_wirs->append(r, g_rng); + ext_wirs->insert(r); } size_t k = 1000; uint64_t lower_key = 0; @@ -196,7 +199,7 @@ START_TEST(t_range_sample_weighted) size_t cnt[3] = {0}; for (size_t i=0; i<1000; i++) { - ext_wirs->range_sample(buffer, lower_key, upper_key, k, g_rng); + ext_wirs->range_sample(buffer, lower_key, upper_key, k); for (size_t j=0; j>(100, 100, 2, .01, 1, g_rng); + auto ext_wirs = new DE(100, 2, .01); + + auto rng = gsl_rng_alloc(gsl_rng_mt19937); std::set> records; std::set> to_delete; @@ -237,23 +243,23 @@ START_TEST(t_tombstone_merging_01) size_t cnt=0; for (auto rec : records) { WRec r = {rec.first, rec.second, 1, 0}; - ck_assert_int_eq(ext_wirs->append(r, g_rng), 1); + ck_assert_int_eq(ext_wirs->insert(r), 1); - if (gsl_rng_uniform(g_rng) < 0.05 && !to_delete.empty()) { + if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) { std::vector> del_vec; std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); for (size_t i=0; iappend(dr, g_rng); + ext_wirs->insert(dr); deletes++; to_delete.erase(del_vec[i]); deleted.insert(del_vec[i]); } } - if (gsl_rng_uniform(g_rng) < 0.25 && deleted.find(rec) == deleted.end()) { + if (gsl_rng_uniform(rng) < 0.25 && deleted.find(rec) == deleted.end()) { to_delete.insert(rec); } @@ -262,12 +268,15 @@ START_TEST(t_tombstone_merging_01) ck_assert(ext_wirs->validate_tombstone_proportion()); + gsl_rng_free(rng); delete ext_wirs; } END_TEST -DynamicExtension> *create_test_tree(size_t reccnt, size_t memlevel_cnt) { - auto ext_wirs = new DynamicExtension>(1000, 1000, 2, 1, 1, g_rng); +DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) { + auto rng = gsl_rng_alloc(gsl_rng_mt19937); + + auto ext_wirs = new DE(1000, 2, 1); std::set records; std::set to_delete; @@ -284,33 +293,37 @@ DynamicExtension> *create_test_tree(size_t reccnt, size_t memle size_t deletes = 0; for (auto rec : records) { - ck_assert_int_eq(ext_wirs->append(rec, g_rng), 1); + ck_assert_int_eq(ext_wirs->insert(rec), 1); - if (gsl_rng_uniform(g_rng) < 0.05 && !to_delete.empty()) { + if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) { std::vector del_vec; std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); for (size_t i=0; iappend(del_vec[i], g_rng); + ext_wirs->insert(del_vec[i]); deletes++; to_delete.erase(del_vec[i]); deleted.insert(del_vec[i]); } } - if (gsl_rng_uniform(g_rng) < 0.25 && deleted.find(rec) == deleted.end()) { + if (gsl_rng_uniform(rng) < 0.25 && deleted.find(rec) == deleted.end()) { to_delete.insert(rec); } } + gsl_rng_free(rng); + return ext_wirs; } START_TEST(t_sorted_array) { + auto rng = gsl_rng_alloc(gsl_rng_mt19937); + size_t reccnt = 100000; - auto ext_wirs = new DynamicExtension>(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DE(100, 2, 1); std::set> records; std::set> to_delete; @@ -328,9 +341,9 @@ START_TEST(t_sorted_array) size_t deletes = 0; for (auto rec : records) { WRec r = {rec.first, rec.second, 1}; - ck_assert_int_eq(ext_wirs->append(r, g_rng), 1); + ck_assert_int_eq(ext_wirs->insert(r), 1); - if (gsl_rng_uniform(g_rng) < 0.05 && !to_delete.empty()) { + if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) { std::vector> del_vec; std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); @@ -338,14 +351,14 @@ START_TEST(t_sorted_array) WRec dr = {del_vec[i].first, del_vec[i].second, 1}; dr.set_tombstone(); - ext_wirs->append(dr , g_rng); + ext_wirs->insert(dr ); deletes++; to_delete.erase(del_vec[i]); deleted.insert(del_vec[i]); } } - if (gsl_rng_uniform(g_rng) < 0.25 && deleted.find(rec) == deleted.end()) { + if (gsl_rng_uniform(rng) < 0.25 && deleted.find(rec) == deleted.end()) { to_delete.insert(rec); } } @@ -360,6 +373,7 @@ START_TEST(t_sorted_array) prev_key = k; } + gsl_rng_free(rng); delete flat; delete ext_wirs; } @@ -374,17 +388,19 @@ Suite *unit_testing() tcase_add_test(create, t_create); suite_add_tcase(unit, create); - TCase *append = tcase_create("de::DynamicExtension::append Testing"); - tcase_add_test(append, t_append); - tcase_add_test(append, t_append_with_mem_merges); - suite_add_tcase(unit, append); + TCase *insert = tcase_create("de::DynamicExtension::insert Testing"); + tcase_add_test(insert, t_insert); + tcase_add_test(insert, t_insert_with_mem_merges); + suite_add_tcase(unit, insert); TCase *sampling = tcase_create("de::DynamicExtension::range_sample Testing"); + /* tcase_add_test(sampling, t_range_sample_memtable); tcase_add_test(sampling, t_range_sample_memlevels); tcase_add_test(sampling, t_range_sample_weighted); suite_add_tcase(unit, sampling); + */ TCase *ts = tcase_create("de::DynamicExtension::tombstone_compaction Testing"); tcase_add_test(ts, t_tombstone_merging_01); diff --git a/tests/internal_level_tests.cpp b/tests/internal_level_tests.cpp index 6803112..42c9044 100644 --- a/tests/internal_level_tests.cpp +++ b/tests/internal_level_tests.cpp @@ -11,28 +11,33 @@ */ #include "shard/WIRS.h" #include "framework/InternalLevel.h" -#include "util/bf_config.h" +#include "framework/RecordInterface.h" +#include "framework/QueryInterface.h" +#include "framework/ShardInterface.h" + #include "testing.h" #include using namespace de; +typedef InternalLevel, WIRS>, WIRSQuery>> ILevel; + START_TEST(t_memlevel_merge) { - auto tbl1 = create_test_mbuffer(100); - auto tbl2 = create_test_mbuffer(100); + auto tbl1 = create_test_mbuffer(100); + auto tbl2 = create_test_mbuffer(100); - auto base_level = new InternalLevel>(1, 1); - base_level->append_mem_table(tbl1, g_rng); + auto base_level = new ILevel(1, 1); + base_level->append_buffer(tbl1); ck_assert_int_eq(base_level->get_record_cnt(), 100); - auto merging_level = new InternalLevel>(0, 1); - merging_level->append_mem_table(tbl2, g_rng); + auto merging_level = new ILevel(0, 1); + merging_level->append_buffer(tbl2); ck_assert_int_eq(merging_level->get_record_cnt(), 100); auto old_level = base_level; - base_level = InternalLevel>::merge_levels(old_level, merging_level, g_rng); + base_level = ILevel::merge_levels(old_level, merging_level); delete old_level; delete merging_level; @@ -44,13 +49,13 @@ START_TEST(t_memlevel_merge) } -InternalLevel> *create_test_memlevel(size_t reccnt) { - auto tbl1 = create_test_mbuffer(reccnt/2); - auto tbl2 = create_test_mbuffer(reccnt/2); +ILevel *create_test_memlevel(size_t reccnt) { + auto tbl1 = create_test_mbuffer(reccnt/2); + auto tbl2 = create_test_mbuffer(reccnt/2); - auto base_level = new InternalLevel>(1, 2); - base_level->append_mem_table(tbl1, g_rng); - base_level->append_mem_table(tbl2, g_rng); + auto base_level = new ILevel(1, 2); + base_level->append_buffer(tbl1); + base_level->append_buffer(tbl2); delete tbl1; delete tbl2; diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp index 75cbeec..fc8b511 100644 --- a/tests/mutable_buffer_tests.cpp +++ b/tests/mutable_buffer_tests.cpp @@ -11,7 +11,6 @@ */ #include #include -#include #include #include @@ -26,32 +25,29 @@ using namespace de; START_TEST(t_create) { - auto rng = gsl_rng_alloc(gsl_rng_mt19937); - auto buffer = new MutableBuffer(100, true, 50, rng); + auto buffer = new MutableBuffer(100, true, 50); ck_assert_ptr_nonnull(buffer); ck_assert_int_eq(buffer->get_capacity(), 100); ck_assert_int_eq(buffer->get_record_count(), 0); ck_assert_int_eq(buffer->is_full(), false); - ck_assert_ptr_nonnull(buffer->sorted_output()); + ck_assert_ptr_nonnull(buffer->get_data()); ck_assert_int_eq(buffer->get_tombstone_count(), 0); ck_assert_int_eq(buffer->get_tombstone_capacity(), 50); delete buffer; - gsl_rng_free(rng); } END_TEST START_TEST(t_insert) { - auto rng = gsl_rng_alloc(gsl_rng_mt19937); - auto buffer = new MutableBuffer(100, true, 50, rng); + auto buffer = new MutableBuffer(100, true, 50); uint64_t key = 0; uint32_t val = 5; - WRec rec = {0, 5, 1}; + WrappedWRec rec = {0, 5, 1}; for (size_t i=0; i<99; i++) { ck_assert_int_eq(buffer->append(rec), 1); @@ -74,7 +70,6 @@ START_TEST(t_insert) ck_assert_int_eq(buffer->append(rec), 0); delete buffer; - gsl_rng_free(rng); } END_TEST @@ -82,12 +77,11 @@ END_TEST START_TEST(t_insert_tombstones) { - auto rng = gsl_rng_alloc(gsl_rng_mt19937); - auto buffer = new MutableBuffer(100, true, 50, rng); + auto buffer = new MutableBuffer(100, true, 50); size_t ts_cnt = 0; - Rec rec = {0, 5}; + WrappedRec rec = {0, 5}; for (size_t i=0; i<99; i++) { bool ts = false; @@ -124,18 +118,16 @@ START_TEST(t_insert_tombstones) ck_assert_int_eq(buffer->append(rec), 0); delete buffer; - gsl_rng_free(rng); } END_TEST START_TEST(t_truncate) { - auto rng = gsl_rng_alloc(gsl_rng_mt19937); - auto buffer = new MutableBuffer(100, true, 100, rng); + auto buffer = new MutableBuffer(100, true, 100); size_t ts_cnt = 0; - Rec rec = {0, 5}; + WrappedRec rec = {0, 5}; for (size_t i=0; i<100; i++) { bool ts = false; @@ -168,7 +160,6 @@ START_TEST(t_truncate) ck_assert_int_eq(buffer->append(rec), 1); delete buffer; - gsl_rng_free(rng); } END_TEST @@ -178,8 +169,7 @@ START_TEST(t_sorted_output) { size_t cnt = 100; - auto rng = gsl_rng_alloc(gsl_rng_mt19937); - auto buffer = new MutableBuffer(cnt, true, cnt/2, rng); + auto buffer = new MutableBuffer(cnt, true, cnt/2); std::vector keys(cnt); @@ -194,19 +184,19 @@ START_TEST(t_sorted_output) uint32_t val = 12345; for (size_t i=0; iappend(Rec {keys[i], val}); + buffer->append(WrappedRec {keys[i], val}); } - Rec r1 = {keys[cnt-2], val}; + WrappedRec r1 = {keys[cnt-2], val}; r1.set_tombstone(); buffer->append(r1); - Rec r2 = {keys[cnt-1], val}; + WrappedRec r2 = {keys[cnt-1], val}; r2.set_tombstone(); buffer->append(r2); - auto *sorted_records = buffer->sorted_output(); + auto *sorted_records = buffer->get_data(); std::sort(keys.begin(), keys.end()); for (size_t i=0; i> *values, size_t start, size_t stop, MutableBuffer *buffer) +void insert_records(std::vector> *values, size_t start, size_t stop, MutableBuffer *buffer) { for (size_t i=start; iappend({(*values)[i].first, (*values)[i].second}); @@ -231,12 +220,11 @@ void insert_records(std::vector> *values, size_t s START_TEST(t_multithreaded_insert) { size_t cnt = 10000; - auto rng = gsl_rng_alloc(gsl_rng_mt19937); - auto buffer = new MutableBuffer(cnt, true, cnt/2, rng); + auto buffer = new MutableBuffer(cnt, true, cnt/2); - std::vector records(cnt); + std::vector records(cnt); for (size_t i=0; i #include -#include "util/Record.h" #include "util/types.h" #include "util/base.h" #include "framework/MutableBuffer.h" -//#include "framework/InternalLevel.h" typedef de::WeightedRecord WRec; typedef de::Record Rec; - -static gsl_rng *g_rng = gsl_rng_alloc(gsl_rng_mt19937); +typedef de::WrappedRecord WrappedWRec; +typedef de::WrappedRecord WrappedRec; static bool initialize_test_file(std::string fname, size_t page_cnt) { @@ -71,7 +69,7 @@ static bool roughly_equal(int n1, int n2, size_t mag, double epsilon) { template static de::MutableBuffer *create_test_mbuffer(size_t cnt) { - auto buffer = new de::MutableBuffer(cnt, true, cnt, g_rng); + auto buffer = new de::MutableBuffer(cnt, true, cnt); R rec; for (size_t i = 0; i < cnt; i++) { @@ -91,7 +89,7 @@ static de::MutableBuffer *create_test_mbuffer(size_t cnt) template static de::MutableBuffer *create_test_mbuffer_tombstones(size_t cnt, size_t ts_cnt) { - auto buffer = new de::MutableBuffer(cnt, true, ts_cnt, g_rng); + auto buffer = new de::MutableBuffer(cnt, true, ts_cnt); std::vector> tombstones; @@ -122,7 +120,7 @@ static de::MutableBuffer *create_test_mbuffer_tombstones(size_t cnt, size_t t template static de::MutableBuffer *create_weighted_mbuffer(size_t cnt) { - auto buffer = new de::MutableBuffer(cnt, true, cnt, g_rng); + auto buffer = new de::MutableBuffer(cnt, true, cnt); // Put in half of the count with weight one. for (uint32_t i=0; i< cnt / 2; i++) { @@ -145,9 +143,9 @@ static de::MutableBuffer *create_weighted_mbuffer(size_t cnt) template static de::MutableBuffer *create_double_seq_mbuffer(size_t cnt, bool ts=false) { - auto buffer = new de::MutableBuffer(cnt, true, cnt, g_rng); + auto buffer = new de::MutableBuffer(cnt, true, cnt); - for (size_t i = 0; i < cnt / 2; i++) { + for (size_t i = 0; i < cnt / 2; i++) { R rec; rec.key = i; rec.value = i; diff --git a/tests/wirs_tests.cpp b/tests/wirs_tests.cpp index c7edec9..3a80a9e 100644 --- a/tests/wirs_tests.cpp +++ b/tests/wirs_tests.cpp @@ -11,19 +11,17 @@ */ #include "shard/WIRS.h" -#include "framework/InternalLevel.h" -#include "util/bf_config.h" #include "testing.h" #include using namespace de; -typedef WIRS Shard; +typedef WIRS Shard; START_TEST(t_mbuffer_init) { - auto mem_table = new MutableBuffer(1024, true, 1024, g_rng); + auto mem_table = new MutableBuffer(1024, true, 1024); for (uint64_t i = 512; i > 0; i--) { uint32_t v = i; mem_table->append({i,v, 1}); @@ -39,11 +37,9 @@ START_TEST(t_mbuffer_init) mem_table->append({i, v, 1}); } - BloomFilter* bf = new BloomFilter(BF_FPR, mem_table->get_tombstone_count(), BF_HASH_FUNCS, g_rng); - Shard* shard = new Shard(mem_table, bf); + Shard* shard = new Shard(mem_table); ck_assert_uint_eq(shard->get_record_count(), 512); - delete bf; delete mem_table; delete shard; } @@ -51,20 +47,16 @@ START_TEST(t_mbuffer_init) START_TEST(t_wirs_init) { size_t n = 512; - auto mbuffer1 = create_test_mbuffer(n); - auto mbuffer2 = create_test_mbuffer(n); - auto mbuffer3 = create_test_mbuffer(n); - - BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); - BloomFilter* bf2 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); - BloomFilter* bf3 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); - auto shard1 = new Shard(mbuffer1, bf1); - auto shard2 = new Shard(mbuffer2, bf2); - auto shard3 = new Shard(mbuffer3, bf3); - - BloomFilter* bf4 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); + auto mbuffer1 = create_test_mbuffer(n); + auto mbuffer2 = create_test_mbuffer(n); + auto mbuffer3 = create_test_mbuffer(n); + + auto shard1 = new Shard(mbuffer1); + auto shard2 = new Shard(mbuffer2); + auto shard3 = new Shard(mbuffer3); + Shard* shards[3] = {shard1, shard2, shard3}; - auto shard4 = new Shard(shards, 3, bf4); + auto shard4 = new Shard(shards, 3); ck_assert_int_eq(shard4->get_record_count(), n * 3); ck_assert_int_eq(shard4->get_tombstone_count(), 0); @@ -96,24 +88,20 @@ START_TEST(t_wirs_init) delete mbuffer2; delete mbuffer3; - delete bf1; delete shard1; - delete bf2; delete shard2; - delete bf3; delete shard3; - delete bf4; delete shard4; } +/* START_TEST(t_get_lower_bound_index) { size_t n = 10000; auto mbuffer = create_double_seq_mbuffer(n); ck_assert_ptr_nonnull(mbuffer); - BloomFilter* bf = new BloomFilter(100, BF_HASH_FUNCS, g_rng); - Shard* shard = new Shard(mbuffer, bf); + Shard* shard = new Shard(mbuffer); ck_assert_int_eq(shard->get_record_count(), n); ck_assert_int_eq(shard->get_tombstone_count(), 0); @@ -127,22 +115,19 @@ START_TEST(t_get_lower_bound_index) } delete mbuffer; - delete bf; delete shard; } +*/ START_TEST(t_full_cancelation) { size_t n = 100; - auto buffer = create_double_seq_mbuffer(n, false); - auto buffer_ts = create_double_seq_mbuffer(n, true); - BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); - BloomFilter* bf2 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); - BloomFilter* bf3 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); + auto buffer = create_double_seq_mbuffer(n, false); + auto buffer_ts = create_double_seq_mbuffer(n, true); - Shard* shard = new Shard(buffer, bf1); - Shard* shard_ts = new Shard(buffer_ts, bf2); + Shard* shard = new Shard(buffer); + Shard* shard_ts = new Shard(buffer_ts); ck_assert_int_eq(shard->get_record_count(), n); ck_assert_int_eq(shard->get_tombstone_count(), 0); @@ -151,16 +136,13 @@ START_TEST(t_full_cancelation) Shard* shards[] = {shard, shard_ts}; - Shard* merged = new Shard(shards, 2, bf3); + Shard* merged = new Shard(shards, 2); ck_assert_int_eq(merged->get_tombstone_count(), 0); ck_assert_int_eq(merged->get_record_count(), 0); delete buffer; delete buffer_ts; - delete bf1; - delete bf2; - delete bf3; delete shard; delete shard_ts; delete merged; @@ -168,13 +150,13 @@ START_TEST(t_full_cancelation) END_TEST +/* START_TEST(t_weighted_sampling) { size_t n=1000; auto buffer = create_weighted_mbuffer(n); - BloomFilter* bf = new BloomFilter(100, BF_HASH_FUNCS, g_rng); - Shard* shard = new Shard(buffer, bf); + Shard* shard = new Shard(buffer); uint64_t lower_key = 0; uint64_t upper_key = 5; @@ -202,17 +184,18 @@ START_TEST(t_weighted_sampling) ck_assert(roughly_equal(cnt[2] / 1000, (double) k/2.0, k, .05)); delete shard; - delete bf; delete buffer; } END_TEST +*/ +/* START_TEST(t_tombstone_check) { size_t cnt = 1024; size_t ts_cnt = 256; - auto buffer = new MutableBuffer(cnt + ts_cnt, true, ts_cnt, g_rng); + auto buffer = new MutableBuffer(cnt + ts_cnt, true, ts_cnt); std::vector> tombstones; @@ -234,8 +217,7 @@ START_TEST(t_tombstone_check) buffer->append({tombstones[i].first, tombstones[i].second, 1, 1}); } - BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); - auto shard = new Shard(buffer, bf1); + auto shard = new Shard(buffer); for (size_t i=0; icheck_tombstone({tombstones[i].first, tombstones[i].second})); @@ -244,9 +226,9 @@ START_TEST(t_tombstone_check) delete shard; delete buffer; - delete bf1; } END_TEST +*/ Suite *unit_testing() { @@ -260,7 +242,7 @@ Suite *unit_testing() TCase *bounds = tcase_create("de:WIRS::get_{lower,upper}_bound Testing"); - tcase_add_test(bounds, t_get_lower_bound_index); + //tcase_add_test(bounds, t_get_lower_bound_index); tcase_set_timeout(bounds, 100); suite_add_tcase(unit, bounds); @@ -270,14 +252,18 @@ Suite *unit_testing() suite_add_tcase(unit, tombstone); + /* TCase *sampling = tcase_create("de:WIRS::sampling Testing"); tcase_add_test(sampling, t_weighted_sampling); suite_add_tcase(unit, sampling); + */ + /* TCase *check_ts = tcase_create("de::WIRS::check_tombstone Testing"); tcase_add_test(check_ts, t_tombstone_check); suite_add_tcase(unit, check_ts); + */ return unit; } -- cgit v1.2.3