diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-02-07 17:23:23 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-02-07 17:24:50 -0500 |
| commit | bd74e27b28bd95267ce50d2e4b6f12b51d9b6aae (patch) | |
| tree | 8e40038feaa9c83c4da967ab78564c51fc67a653 | |
| parent | 2c5d549b3618b9ea72e6eece4cb4f3da5a6811a8 (diff) | |
| download | dynamic-extension-bd74e27b28bd95267ce50d2e4b6f12b51d9b6aae.tar.gz | |
Cleaned up shard files (except VPTree)
Cleaned up shard implementations, fixed a few bugs, and set up some
tests. There's still some work to be done in creating tests for the
weighted sampling operations for the alias and aug btree shards.
28 files changed, 859 insertions, 1674 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index e7426b8..5f77396 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(namespace "de") project("Practical Dynamic Extension" VERSION 0.1.0) -set(debug false) +set(debug true) set(tests True) set(bench true) set(old_bench False) @@ -33,9 +33,10 @@ if (tests) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/bin/tests") file(MAKE_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/tests/data") - #add_executable(augbtree_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/augbtree_tests.cpp) - #target_link_libraries(augbtree_tests PUBLIC gsl check subunit pthread) - #target_include_directories(augbtree_tests PRIVATE include external/psudb-common/cpp/include external/ctpl) + add_executable(augbtree_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/augbtree_tests.cpp) + target_link_libraries(augbtree_tests PUBLIC gsl check subunit pthread atomic) + target_link_options(augbtree_tests PUBLIC -mcx16) + target_include_directories(augbtree_tests PRIVATE include external/psudb-common/cpp/include external/ctpl) add_executable(internal_level_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/internal_level_tests.cpp) target_link_libraries(internal_level_tests PUBLIC gsl check subunit pthread atomic) @@ -103,18 +104,16 @@ if (tests) target_link_options(triespline_tests PUBLIC -mcx16) target_include_directories(triespline_tests PRIVATE include external/psudb-common/cpp/include external/PLEX/include) - #add_executable(alias_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/alias_tests.cpp) - #target_link_libraries(alias_tests PUBLIC gsl check subunit pthread) - #target_include_directories(alias_tests PRIVATE include external/psudb-common/cpp/include) + add_executable(alias_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/alias_tests.cpp) + target_link_libraries(alias_tests PUBLIC gsl check subunit pthread atomic) + target_link_options(alias_tests PUBLIC -mcx16) + target_include_directories(alias_tests PRIVATE include external/psudb-common/cpp/include) - #add_executable(triespline_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/triespline_tests.cpp) - #target_link_libraries(triespline_tests PUBLIC gsl check subunit pthread) - #target_include_directories(triespline_tests PRIVATE include external/PLEX/include external/psudb-common/cpp/include) - - #add_executable(pgm_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/pgm_tests.cpp) - #target_link_libraries(pgm_tests PUBLIC gsl check subunit pthread gomp) - #target_include_directories(pgm_tests PRIVATE include external/PGM-index/include external/psudb-common/cpp/include) - #target_compile_options(pgm_tests PUBLIC -fopenmp) + add_executable(pgm_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/pgm_tests.cpp) + target_link_libraries(pgm_tests PUBLIC gsl check subunit pthread gomp atomic) + target_include_directories(pgm_tests PRIVATE include external/PGM-index/include external/psudb-common/cpp/include) + target_link_options(pgm_tests PUBLIC -mcx16) + target_compile_options(pgm_tests PUBLIC -fopenmp) endif() if (bench) diff --git a/include/query/wss.h b/include/query/wss.h index ea36cb2..8797035 100644 --- a/include/query/wss.h +++ b/include/query/wss.h @@ -60,7 +60,7 @@ public: return res; } - static void* get_buffer_query_state(BufferState<R> *buffer, void *parms) { + static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) { BufferState<R> *state = new BufferState<R>(); auto parameters = (Parms<R>*) parms; if constexpr (Rejection) { diff --git a/include/shard/Alias.h b/include/shard/Alias.h index a234575..f0d1d59 100644 --- a/include/shard/Alias.h +++ b/include/shard/Alias.h @@ -14,20 +14,19 @@ #pragma once #include <vector> -#include <cassert> #include "framework/ShardRequirements.h" -#include "psu-ds/PriorityQueue.h" -#include "util/Cursor.h" #include "psu-ds/Alias.h" #include "psu-ds/BloomFilter.h" #include "util/bf_config.h" +#include "util/SortedMerge.h" using psudb::CACHELINE_SIZE; using psudb::BloomFilter; using psudb::PriorityQueue; using psudb::queue_record; +using psudb::byte; namespace de { @@ -41,126 +40,73 @@ private: typedef decltype(R::weight) W; public: - Alias(BufferView<R>* buffer) - : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_alias(nullptr), m_bf(nullptr) { - - m_alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE); - assert(m_alloc_size % CACHELINE_SIZE == 0); - m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size); - - m_bf = new BloomFilter<R>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS); - - size_t offset = 0; - m_reccnt = 0; - auto base = buffer->get_data(); - auto stop = base + buffer->get_record_count(); - - std::sort(base, stop, std::less<Wrapped<R>>()); - - std::vector<W> weights; - - while (base < stop) { - if (!(base->is_tombstone()) && (base + 1) < stop) { - if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - wss_cancelations++; - continue; - } - } else if (base->is_deleted()) { - base += 1; - continue; - } + Alias(BufferView<R> buffer) + : m_data(nullptr) + , m_alias(nullptr) + , m_total_weight(0) + , m_reccnt(0) + , m_tombstone_cnt(0) + , m_alloc_size(0) + , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) { + + + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + buffer.get_record_count() * + sizeof(Wrapped<R>), + (byte**) &m_data); + + auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; - // FIXME: this shouldn't be necessary, but the tagged record - // bypass doesn't seem to be working on this code-path, so this - // ensures that tagged records from the buffer are able to be - // dropped, eventually. It should only need to be &= 1 - base->header &= 3; - m_data[m_reccnt++] = *base; - m_total_weight+= base->rec.weight; - weights.push_back(base->rec.weight); - - if (m_bf && base->is_tombstone()) { - m_tombstone_cnt++; - m_bf->insert(base->rec); + if (m_reccnt > 0) { + std::vector<W> weights; + for (size_t i=0; i<m_reccnt; i++) { + weights.emplace_back(m_data[i].rec.weight); + m_total_weight += m_data[i].rec.weight; } - - base++; - } - if (m_reccnt > 0) { build_alias_structure(weights); } } Alias(std::vector<Alias*> &shards) - : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_alias(nullptr), m_bf(nullptr) { - std::vector<Cursor<Wrapped<R>>> cursors; - cursors.reserve(shards.size()); - - PriorityQueue<Wrapped<R>> pq(shards.size()); + : m_data(nullptr) + , m_alias(nullptr) + , m_total_weight(0) + , m_reccnt(0) + , m_tombstone_cnt(0) + , m_alloc_size(0) + , m_bf(nullptr) { size_t attemp_reccnt = 0; size_t tombstone_count = 0; - - for (size_t i = 0; i < shards.size(); ++i) { - if (shards[i]) { - auto base = shards[i]->get_data(); - cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); - attemp_reccnt += shards[i]->get_record_count(); - tombstone_count += shards[i]->get_tombstone_count(); - pq.push(cursors[i].ptr, i); - } else { - cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); - } - } + auto cursors = build_cursor_vec<R, Alias>(shards, &attemp_reccnt, &tombstone_count); m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + attemp_reccnt * sizeof(Wrapped<R>), + (byte **) &m_data); - m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE); - assert(m_alloc_size % CACHELINE_SIZE == 0); - m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size); - - std::vector<W> weights; - - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - if (!cursor.ptr->is_deleted()) { - m_data[m_reccnt++] = *cursor.ptr; - m_total_weight += cursor.ptr->rec.weight; - weights.push_back(cursor.ptr->rec.weight); - if (m_bf && cursor.ptr->is_tombstone()) { - ++m_tombstone_cnt; - if (m_bf) m_bf->insert(cursor.ptr->rec); - } - } - pq.pop(); - - if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version); - } - } + auto res = sorted_array_merge<R>(cursors, m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; if (m_reccnt > 0) { + std::vector<W> weights; + for (size_t i=0; i<m_reccnt; i++) { + weights.emplace_back(m_data[i].rec.weight); + m_total_weight += m_data[i].rec.weight; + } + build_alias_structure(weights); } } ~Alias() { - if (m_data) free(m_data); - if (m_alias) delete m_alias; - if (m_bf) delete m_bf; - + free(m_data); + delete m_alias; + delete m_bf; } Wrapped<R> *point_lookup(const R &rec, bool filter=false) { @@ -173,7 +119,7 @@ public: return nullptr; } - while (idx < m_reccnt && m_data[idx].rec < rec) ++idx; + while (idx < (m_reccnt-1) && m_data[idx].rec < rec) ++idx; if (m_data[idx].rec == rec) { return m_data + idx; @@ -205,7 +151,7 @@ public: } size_t get_aux_memory_usage() { - return 0; + return (m_bf) ? m_bf->memory_usage() : 0; } W get_total_weight() { @@ -254,7 +200,6 @@ private: W m_total_weight; size_t m_reccnt; size_t m_tombstone_cnt; - size_t m_group_size; size_t m_alloc_size; BloomFilter<R> *m_bf; }; diff --git a/include/shard/AugBTree.h b/include/shard/AugBTree.h index be664ac..58bd098 100644 --- a/include/shard/AugBTree.h +++ b/include/shard/AugBTree.h @@ -16,28 +16,21 @@ #include <vector> #include <cassert> -#include <queue> -#include <memory> -#include <concepts> #include "framework/ShardRequirements.h" -#include "psu-ds/PriorityQueue.h" -#include "util/Cursor.h" #include "psu-ds/Alias.h" #include "psu-ds/BloomFilter.h" #include "util/bf_config.h" +#include "util/SortedMerge.h" using psudb::CACHELINE_SIZE; using psudb::BloomFilter; -using psudb::PriorityQueue; -using psudb::queue_record; using psudb::Alias; +using psudb::byte; namespace de { -thread_local size_t wirs_cancelations = 0; - template <WeightedRecordInterface R> struct AugBTreeNode { struct AugBTreeNode<R> *left, *right; @@ -54,108 +47,52 @@ private: typedef decltype(R::weight) W; public: - AugBTree(MutableBuffer<R>* buffer) - : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) { - m_alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE); - assert(m_alloc_size % CACHELINE_SIZE == 0); - m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size); - - m_bf = new BloomFilter<R>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS); - - size_t offset = 0; - m_reccnt = 0; - auto base = buffer->get_data(); - auto stop = base + buffer->get_record_count(); - - std::sort(base, stop, std::less<Wrapped<R>>()); - - while (base < stop) { - if (!(base->is_tombstone()) && (base + 1) < stop) { - if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - wirs_cancelations++; - continue; - } - } else if (base->is_deleted()) { - base += 1; - continue; - } - - // FIXME: this shouldn't be necessary, but the tagged record - // bypass doesn't seem to be working on this code-path, so this - // ensures that tagged records from the buffer are able to be - // dropped, eventually. It should only need to be &= 1 - base->header &= 3; - m_data[m_reccnt++] = *base; - m_total_weight+= base->rec.weight; - - if (m_bf && base->is_tombstone()) { - m_tombstone_cnt++; - m_bf->insert(base->rec); - } - - base++; - } + AugBTree(BufferView<R> buffer) + : m_data(nullptr) + , m_root(nullptr) + , m_reccnt(0) + , m_tombstone_cnt(0) + , m_group_size(0) + , m_alloc_size(0) + , m_node_cnt(0) + , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + { + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + buffer.get_record_count() * + sizeof(Wrapped<R>), + (byte**) &m_data); + + auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; if (m_reccnt > 0) { build_wirs_structure(); } } - AugBTree(AugBTree** shards, size_t len) - : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) { - std::vector<Cursor<Wrapped<R>>> cursors; - cursors.reserve(len); - - PriorityQueue<Wrapped<R>> pq(len); - + AugBTree(std::vector<AugBTree*> shards) + : m_data(nullptr) + , m_root(nullptr) + , m_reccnt(0) + , m_tombstone_cnt(0) + , m_group_size(0) + , m_alloc_size(0) + , m_node_cnt(0) + , m_bf(nullptr) + { size_t attemp_reccnt = 0; size_t tombstone_count = 0; - - for (size_t i = 0; i < len; ++i) { - if (shards[i]) { - auto base = shards[i]->get_data(); - cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); - attemp_reccnt += shards[i]->get_record_count(); - tombstone_count += shards[i]->get_tombstone_count(); - pq.push(cursors[i].ptr, i); - } else { - cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); - } - } + auto cursors = build_cursor_vec<R, AugBTree>(shards, &attemp_reccnt, &tombstone_count); m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + attemp_reccnt * sizeof(Wrapped<R>), + (byte **) &m_data); - m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE); - assert(m_alloc_size % CACHELINE_SIZE == 0); - m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size); - - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - if (!cursor.ptr->is_deleted()) { - m_data[m_reccnt++] = *cursor.ptr; - m_total_weight += cursor.ptr->rec.weight; - if (m_bf && cursor.ptr->is_tombstone()) { - ++m_tombstone_cnt; - if (m_bf) m_bf->insert(cursor.ptr->rec); - } - } - pq.pop(); - - if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version); - } - } + auto res = sorted_array_merge<R>(cursors, m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; if (m_reccnt > 0) { build_wirs_structure(); @@ -163,13 +100,12 @@ public: } ~AugBTree() { - if (m_data) free(m_data); + free(m_data); for (size_t i=0; i<m_alias.size(); i++) { - if (m_alias[i]) delete m_alias[i]; + delete m_alias[i]; } - if (m_bf) delete m_bf; - + delete m_bf; free_tree(m_root); } @@ -183,7 +119,7 @@ public: return nullptr; } - while (idx < m_reccnt && m_data[idx].rec < rec) ++idx; + while (idx < (m_reccnt-1) && m_data[idx].rec < rec) ++idx; if (m_data[idx].rec == rec) { return m_data + idx; @@ -209,13 +145,12 @@ public: return m_data + idx; } - size_t get_memory_usage() { return m_alloc_size + m_node_cnt * sizeof(AugBTreeNode<Wrapped<R>>); } size_t get_aux_memory_usage() { - return 0; + return (m_bf) ? m_bf->memory_usage() : 0; } size_t get_lower_bound(const K& key) const { @@ -364,7 +299,6 @@ private: Wrapped<R>* m_data; std::vector<Alias *> m_alias; AugBTreeNode<R>* m_root; - W m_total_weight; size_t m_reccnt; size_t m_tombstone_cnt; size_t m_group_size; diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index 7de9cb1..33ba82f 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -17,9 +17,8 @@ #include "framework/ShardRequirements.h" #include "util/bf_config.h" -#include "psu-ds/PriorityQueue.h" -#include "util/Cursor.h" -#include "psu-util/timer.h" +#include "psu-ds/BloomFilter.h" +#include "util/SortedMerge.h" using psudb::CACHELINE_SIZE; using psudb::BloomFilter; @@ -61,60 +60,18 @@ public: , m_alloc_size(0) , m_data(nullptr) { - TIMER_INIT(); - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), (byte**) &m_data); - TIMER_START(); - auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>)); - buffer.copy_to_buffer((byte *) temp_buffer); - - auto base = temp_buffer; - auto stop = base + buffer.get_record_count(); - std::sort(base, stop, std::less<Wrapped<R>>()); - TIMER_STOP(); - - auto sort_time = TIMER_RESULT(); - - TIMER_START(); - while (base < stop) { - if (!base->is_tombstone() && (base + 1 < stop) - && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - continue; - } else if (base->is_deleted()) { - base += 1; - continue; - } - - // FIXME: this shouldn't be necessary, but the tagged record - // bypass doesn't seem to be working on this code-path, so this - // ensures that tagged records from the buffer are able to be - // dropped, eventually. It should only need to be &= 1 - base->header &= 3; - m_data[m_reccnt++] = *base; - if (m_bf && base->is_tombstone()) { - ++m_tombstone_cnt; - m_bf->insert(base->rec); - } - - base++; - } - - TIMER_STOP(); - auto copy_time = TIMER_RESULT(); + auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; - TIMER_START(); if (m_reccnt > 0) { build_internal_levels(); } - TIMER_STOP(); - auto level_time = TIMER_RESULT(); - - free(temp_buffer); } ISAMTree(std::vector<ISAMTree*> &shards) @@ -128,58 +85,18 @@ public: , m_alloc_size(0) , m_data(nullptr) { - std::vector<Cursor<Wrapped<R>>> cursors; - cursors.reserve(shards.size()); - - PriorityQueue<Wrapped<R>> pq(shards.size()); - size_t attemp_reccnt = 0; size_t tombstone_count = 0; - - for (size_t i = 0; i < shards.size(); ++i) { - if (shards[i]) { - auto base = shards[i]->get_data(); - cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); - attemp_reccnt += shards[i]->get_record_count(); - tombstone_count += shards[i]->get_tombstone_count(); - pq.push(cursors[i].ptr, i); - } else { - cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); - } - } + auto cursors = build_cursor_vec<R, ISAMTree>(shards, &attemp_reccnt, &tombstone_count); m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **) &m_data); - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - if (!cursor.ptr->is_deleted()) { - m_data[m_reccnt++] = *cursor.ptr; - if (cursor.ptr->is_tombstone()) { - //fprintf(stderr, "ISAM: Tombstone from shard %ld next record from shard %ld\n", - //now.version, next.version); - ++m_tombstone_cnt; - m_bf->insert(cursor.ptr->rec); - } - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); - } - } + auto res = sorted_array_merge<R>(cursors, m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; if (m_reccnt > 0) { build_internal_levels(); @@ -225,11 +142,11 @@ public: size_t get_memory_usage() { - return m_alloc_size; + return m_alloc_size + m_internal_node_cnt * NODE_SZ; } size_t get_aux_memory_usage() { - return m_bf->memory_usage(); + return (m_bf) ? m_bf->memory_usage() : 0; } /* SortedShardInterface methods */ diff --git a/include/shard/PGM.h b/include/shard/PGM.h index 13db26a..8031870 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -14,24 +14,19 @@ #include <vector> -#include <cassert> -#include <queue> -#include <memory> -#include <concepts> #include "framework/ShardRequirements.h" #include "pgm/pgm_index.hpp" -#include "psu-ds/PriorityQueue.h" -#include "util/Cursor.h" #include "psu-ds/BloomFilter.h" +#include "util/SortedMerge.h" #include "util/bf_config.h" using psudb::CACHELINE_SIZE; using psudb::BloomFilter; using psudb::PriorityQueue; using psudb::queue_record; -using psudb::Alias; +using psudb::byte; namespace de { @@ -41,111 +36,65 @@ private: typedef decltype(R::key) K; typedef decltype(R::value) V; - public: - PGM(MutableBuffer<R>* buffer) - : m_reccnt(0), m_tombstone_cnt(0) { - - m_alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE); - assert(m_alloc_size % CACHELINE_SIZE == 0); - m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size); - std::vector<K> keys; - - size_t offset = 0; - m_reccnt = 0; - auto base = buffer->get_data(); - auto stop = base + buffer->get_record_count(); + PGM(BufferView<R> buffer) + : m_data(nullptr) + , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + , m_reccnt(0) + , m_tombstone_cnt(0) + , m_alloc_size(0) { + + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + buffer.get_record_count() * + sizeof(Wrapped<R>), + (byte**) &m_data); + auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; - std::sort(base, stop, std::less<Wrapped<R>>()); - - K min_key = base->rec.key; - K max_key = (stop - 1)->rec.key; - - while (base < stop) { - if (!(base->is_tombstone()) && (base + 1) < stop) { - if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - continue; - } - } else if (base->is_deleted()) { - base += 1; - continue; + if (m_reccnt > 0) { + std::vector<K> keys; + for (size_t i=0; i<m_reccnt; i++) { + keys.emplace_back(m_data[i].rec.key); } - // FIXME: this shouldn't be necessary, but the tagged record - // bypass doesn't seem to be working on this code-path, so this - // ensures that tagged records from the buffer are able to be - // dropped, eventually. It should only need to be &= 1 - base->header &= 3; - m_data[m_reccnt++] = *base; - keys.emplace_back(base->rec.key); - base++; - } - - if (m_reccnt > 0) { m_pgm = pgm::PGMIndex<K, epsilon>(keys); } } - PGM(PGM** shards, size_t len) - : m_reccnt(0), m_tombstone_cnt(0) { - std::vector<Cursor<Wrapped<R>>> cursors; - cursors.reserve(len); - - PriorityQueue<Wrapped<R>> pq(len); - + PGM(std::vector<PGM*> shards) + : m_data(nullptr) + , m_bf(nullptr) + , m_reccnt(0) + , m_tombstone_cnt(0) + , m_alloc_size(0) { + size_t attemp_reccnt = 0; size_t tombstone_count = 0; - - for (size_t i = 0; i < len; ++i) { - if (shards[i]) { - auto base = shards[i]->get_data(); - cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); - attemp_reccnt += shards[i]->get_record_count(); - tombstone_count += shards[i]->get_tombstone_count(); - pq.push(cursors[i].ptr, i); - - } else { - cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); - } - } + auto cursors = build_cursor_vec<R, PGM>(shards, &attemp_reccnt, &tombstone_count); - m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE); - assert(m_alloc_size % CACHELINE_SIZE == 0); - m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size); - - std::vector<K> keys; - - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - if (!cursor.ptr->is_deleted()) { - m_data[m_reccnt++] = *cursor.ptr; - keys.emplace_back(cursor.ptr->rec.key); - } - pq.pop(); - - if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version); - } - } + m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + attemp_reccnt * sizeof(Wrapped<R>), + (byte **) &m_data); + + auto res = sorted_array_merge<R>(cursors, m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; if (m_reccnt > 0) { + std::vector<K> keys; + for (size_t i=0; i<m_reccnt; i++) { + keys.emplace_back(m_data[i].rec.key); + } + m_pgm = pgm::PGMIndex<K, epsilon>(keys); } } ~PGM() { - if (m_data) free(m_data); + free(m_data); + delete m_bf; } Wrapped<R> *point_lookup(const R &rec, bool filter=false) { @@ -186,7 +135,7 @@ public: } size_t get_aux_memory_usage() { - return 0; + return (m_bf) ? m_bf->memory_usage() : 0; } size_t get_lower_bound(const K& key) const { @@ -228,6 +177,7 @@ public: private: Wrapped<R>* m_data; + BloomFilter<R> *m_bf; size_t m_reccnt; size_t m_tombstone_cnt; size_t m_alloc_size; diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 9473177..f9fb3cb 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -15,11 +15,9 @@ #include "framework/ShardRequirements.h" #include "ts/builder.h" -#include "psu-ds/PriorityQueue.h" -#include "util/Cursor.h" #include "psu-ds/BloomFilter.h" #include "util/bf_config.h" -#include "psu-util/timer.h" +#include "util/SortedMerge.h" using psudb::CACHELINE_SIZE; using psudb::BloomFilter; @@ -45,78 +43,26 @@ public: , m_min_key(0) , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) { - TIMER_INIT(); - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), (byte**) &m_data); - TIMER_START(); - auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>)); - buffer.copy_to_buffer((byte *) temp_buffer); - - auto base = temp_buffer; - auto stop = base + buffer.get_record_count(); - std::sort(base, stop, std::less<Wrapped<R>>()); - - K min_key = base->rec.key; - K max_key = (stop-1)->rec.key; - TIMER_STOP(); - - auto sort_time = TIMER_RESULT(); - - TIMER_START(); - auto bldr = ts::Builder<K>(min_key, max_key, E); - while (base < stop) { - if (!base->is_tombstone() && (base + 1 < stop) - && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - continue; - } else if (base->is_deleted()) { - base += 1; - continue; - } + auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; - // FIXME: this shouldn't be necessary, but the tagged record - // bypass doesn't seem to be working on this code-path, so this - // ensures that tagged records from the buffer are able to be - // dropped, eventually. It should only need to be &= 1 - base->header &= 3; - m_data[m_reccnt++] = *base; - bldr.AddKey(base->rec.key); - if (m_bf && base->is_tombstone()) { - ++m_tombstone_cnt; - m_bf->insert(base->rec); - } + if (m_reccnt > 0) { + m_min_key = m_data[0].rec.key; + m_max_key = m_data[m_reccnt-1].rec.key; - /* - * determine the "true" min/max keys based on the scan. This is - * to avoid situations where the min/max in the input array - * are deleted and don't survive into the structure itself. - */ - if (m_reccnt == 0) { - m_max_key = m_min_key = base->rec.key; - } else if (base->rec.key > m_max_key) { - m_max_key = base->rec.key; - } else if (base->rec.key < m_min_key) { - m_min_key = base->rec.key; + auto bldr = ts::Builder<K>(m_min_key, m_max_key, E); + for (size_t i=0; i<m_reccnt; i++) { + bldr.AddKey(m_data[i].rec.key); } - base++; - } - - TIMER_STOP(); - auto copy_time = TIMER_RESULT(); - - TIMER_START(); - if (m_reccnt > 0) { m_ts = bldr.Finalize(); } - TIMER_STOP(); - auto level_time = TIMER_RESULT(); - - free(temp_buffer); } TrieSpline(std::vector<TrieSpline*> &shards) @@ -128,77 +74,28 @@ public: , m_min_key(0) , m_bf(nullptr) { - - std::vector<Cursor<Wrapped<R>>> cursors; - cursors.reserve(shards.size()); - - PriorityQueue<Wrapped<R>> pq(shards.size()); - size_t attemp_reccnt = 0; size_t tombstone_count = 0; - - /* - * Initialize m_max_key and m_min_key using the values from the - * first shard. These will later be updated when building - * the initial priority queue to their true values. - */ - m_max_key = shards[0]->m_max_key; - m_min_key = shards[0]->m_min_key; + auto cursors = build_cursor_vec<R, TrieSpline>(shards, &attemp_reccnt, &tombstone_count); - for (size_t i = 0; i < shards.size(); ++i) { - if (shards[i]) { - auto base = shards[i]->get_data(); - cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); - attemp_reccnt += shards[i]->get_record_count(); - tombstone_count += shards[i]->get_tombstone_count(); - pq.push(cursors[i].ptr, i); - - if (shards[i]->m_max_key > m_max_key) { - m_max_key = shards[i]->m_max_key; - } - - if (shards[i]->m_min_key < m_min_key) { - m_min_key = shards[i]->m_min_key; - } - } else { - cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); - } - } - m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **) &m_data); - auto bldr = ts::Builder<K>(m_min_key, m_max_key, E); - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - if (!cursor.ptr->is_deleted()) { - m_data[m_reccnt++] = *cursor.ptr; - bldr.AddKey(cursor.ptr->rec.key); - if (cursor.ptr->is_tombstone()) { - ++m_tombstone_cnt; - m_bf->insert(cursor.ptr->rec); - } - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); - } - } + auto res = sorted_array_merge<R>(cursors, m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; if (m_reccnt > 0) { + m_min_key = m_data[0].rec.key; + m_max_key = m_data[m_reccnt-1].rec.key; + + auto bldr = ts::Builder<K>(m_min_key, m_max_key, E); + for (size_t i=0; i<m_reccnt; i++) { + bldr.AddKey(m_data[i].rec.key); + } + m_ts = bldr.Finalize(); } } @@ -250,7 +147,7 @@ public: } size_t get_aux_memory_usage() { - return 0; + return (m_bf) ? m_bf->memory_usage() : 0; } size_t get_lower_bound(const K& key) const { diff --git a/include/util/SortedMerge.h b/include/util/SortedMerge.h new file mode 100644 index 0000000..ed47acb --- /dev/null +++ b/include/util/SortedMerge.h @@ -0,0 +1,185 @@ +/* + * include/util/SortedMerge.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + * A sorted array merge routine for use in Shard construction, as many + * shards will use a sorted array to represent their data. Also encapsulates + * the necessary tombstone-cancellation logic. + * + * FIXME: include generic per-record processing functionality for Shards that + * need it, to avoid needing to reprocess the array in the shard after + * creation. + */ +#pragma once + +#include "util/Cursor.h" +#include "framework/interface/Shard.h" +#include "psu-ds/PriorityQueue.h" + +namespace de { + +using psudb::PriorityQueue; +using psudb::BloomFilter; +using psudb::queue_record; +using psudb::byte; +using psudb::CACHELINE_SIZE; + +struct merge_info { + size_t record_count; + size_t tombstone_count; +}; + + +template <RecordInterface R, ShardInterface<R> S> +static std::vector<Cursor<Wrapped<R>>> build_cursor_vec(std::vector<S*> &shards, size_t *reccnt, size_t *tscnt) { + std::vector<Cursor<Wrapped<R>>> cursors; + cursors.reserve(shards.size()); + + *reccnt = 0; + *tscnt = 0; + + for (size_t i = 0; i < shards.size(); ++i) { + if (shards[i]) { + auto base = shards[i]->get_data(); + cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); + *reccnt += shards[i]->get_record_count(); + *tscnt += shards[i]->get_tombstone_count(); + } else { + cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); + } + } + + return cursors; +} + +/* + * + */ +template <RecordInterface R> +static merge_info sorted_array_from_bufferview(BufferView<R> bv, + Wrapped<R> *buffer, + psudb::BloomFilter<R> *bf=nullptr) { + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + bv.get_record_count(), + sizeof(Wrapped<R>)); + bv.copy_to_buffer((byte *) temp_buffer); + + auto base = temp_buffer; + auto stop = base + bv.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + merge_info info = {0, 0}; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) + && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; + } + + // fixme: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. it should only need to be &= 1 + base->header &= 3; + buffer[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (bf){ + bf->insert(base->rec); + } + } + + base++; + } + + free(temp_buffer); + return info; +} + +/* + * Perform a sorted merge of the records within cursors into the provided + * buffer. Includes tombstone and tagged delete cancellation logic, and + * will insert tombstones into a bloom filter, if one is provided. + * + * The behavior of this function is undefined if the provided buffer does + * not have space to contain all of the records within the input cursors. + */ +template <RecordInterface R> +static merge_info sorted_array_merge(std::vector<Cursor<Wrapped<R>>> &cursors, + Wrapped<R> *buffer, + psudb::BloomFilter<R> *bf=nullptr) { + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i=0; i<cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } + + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + buffer[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (bf) { + bf->insert(cursor.ptr->rec); + } + } + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + return info; +} + + + +} diff --git a/tests/alias_tests.cpp b/tests/alias_tests.cpp index e3c736b..98d0c63 100644 --- a/tests/alias_tests.cpp +++ b/tests/alias_tests.cpp @@ -12,358 +12,27 @@ #include "shard/Alias.h" #include "query/wss.h" -#include "testing.h" +#include "framework/structure/MutableBuffer.h" +#include "include/testing.h" + #include <check.h> using namespace de; -typedef Alias<WRec> Shard; - -START_TEST(t_mbuffer_init) -{ - auto buffer = new MutableBuffer<WRec>(1024, 1024); - for (uint64_t i = 512; i > 0; i--) { - uint32_t v = i; - buffer->append({i,v, 1}); - } - - for (uint64_t i = 1; i <= 256; ++i) { - uint32_t v = i; - buffer->append({i, v, 1}, true); - } - - for (uint64_t i = 257; i <= 512; ++i) { - uint32_t v = i + 1; - buffer->append({i, v, 1}); - } - - Shard* shard = new Shard(buffer); - ck_assert_uint_eq(shard->get_record_count(), 512); - - delete buffer; - delete shard; -} - - -START_TEST(t_alias_init) -{ - size_t n = 512; - auto mbuffer1 = create_test_mbuffer<WRec>(n); - auto mbuffer2 = create_test_mbuffer<WRec>(n); - auto mbuffer3 = create_test_mbuffer<WRec>(n); - - auto shard1 = new Shard(mbuffer1); - auto shard2 = new Shard(mbuffer2); - auto shard3 = new Shard(mbuffer3); - - Shard* shards[3] = {shard1, shard2, shard3}; - auto shard4 = new Shard(shards, 3); - - ck_assert_int_eq(shard4->get_record_count(), n * 3); - ck_assert_int_eq(shard4->get_tombstone_count(), 0); - - size_t total_cnt = 0; - size_t shard1_idx = 0; - size_t shard2_idx = 0; - size_t shard3_idx = 0; - - for (size_t i = 0; i < shard4->get_record_count(); ++i) { - auto rec1 = shard1->get_record_at(shard1_idx); - auto rec2 = shard2->get_record_at(shard2_idx); - auto rec3 = shard3->get_record_at(shard3_idx); - - auto cur_rec = shard4->get_record_at(i); - - if (shard1_idx < n && cur_rec->rec == rec1->rec) { - ++shard1_idx; - } else if (shard2_idx < n && cur_rec->rec == rec2->rec) { - ++shard2_idx; - } else if (shard3_idx < n && cur_rec->rec == rec3->rec) { - ++shard3_idx; - } else { - assert(false); - } - } - - delete mbuffer1; - delete mbuffer2; - delete mbuffer3; - - delete shard1; - delete shard2; - delete shard3; - delete shard4; -} - - -START_TEST(t_point_lookup) -{ - size_t n = 10000; - - auto buffer = create_double_seq_mbuffer<WRec>(n, false); - auto alias = Shard(buffer); - - for (size_t i=0; i<n; i++) { - WRec r; - auto rec = (buffer->get_data() + i); - r.key = rec->rec.key; - r.value = rec->rec.value; - - auto result = alias.point_lookup(r); - ck_assert_ptr_nonnull(result); - ck_assert_int_eq(result->rec.key, r.key); - ck_assert_int_eq(result->rec.value, r.value); - } - - delete buffer; -} -END_TEST - - -START_TEST(t_point_lookup_miss) -{ - size_t n = 10000; - - auto buffer = create_double_seq_mbuffer<WRec>(n, false); - auto alias = Shard(buffer); - - for (size_t i=n + 100; i<2*n; i++) { - WRec r; - r.key = i; - r.value = i; - - auto result = alias.point_lookup(r); - ck_assert_ptr_null(result); - } - - delete buffer; -} - -START_TEST(t_full_cancelation) -{ - size_t n = 100; - auto buffer = create_double_seq_mbuffer<WRec>(n, false); - auto buffer_ts = create_double_seq_mbuffer<WRec>(n, true); - - Shard* shard = new Shard(buffer); - Shard* shard_ts = new Shard(buffer_ts); - - ck_assert_int_eq(shard->get_record_count(), n); - ck_assert_int_eq(shard->get_tombstone_count(), 0); - ck_assert_int_eq(shard_ts->get_record_count(), n); - ck_assert_int_eq(shard_ts->get_tombstone_count(), n); - - Shard* shards[] = {shard, shard_ts}; - - Shard* merged = new Shard(shards, 2); - - ck_assert_int_eq(merged->get_tombstone_count(), 0); - ck_assert_int_eq(merged->get_record_count(), 0); - - delete buffer; - delete buffer_ts; - delete shard; - delete shard_ts; - delete merged; -} -END_TEST - - -START_TEST(t_alias_query) -{ - size_t n=1000; - auto buffer = create_weighted_mbuffer<WRec>(n); - - Shard* shard = new Shard(buffer); - - size_t k = 1000; - - size_t cnt[3] = {0}; - wss::Parms<WRec> parms = {k}; - parms.rng = gsl_rng_alloc(gsl_rng_mt19937); - - size_t total_samples = 0; - - for (size_t i=0; i<1000; i++) { - auto state = wss::Query<Shard, WRec>::get_query_state(shard, &parms); - ((wss::State<WRec> *) state)->sample_size = k; - auto result = wss::Query<Shard, WRec>::query(shard, state, &parms); - - total_samples += result.size(); +typedef WRec R; +typedef Alias<R> Shard; - for (size_t j=0; j<result.size(); j++) { - cnt[result[j].rec.key - 1]++; - } - - wss::Query<Shard, WRec>::delete_query_state(state); - } - - ck_assert(roughly_equal(cnt[0], (double) total_samples/4.0, total_samples, .05)); - ck_assert(roughly_equal(cnt[1], (double) total_samples/4.0, total_samples, .05)); - ck_assert(roughly_equal(cnt[2], (double) total_samples/2.0, total_samples, .05)); - - gsl_rng_free(parms.rng); - delete shard; - delete buffer; -} -END_TEST - - -START_TEST(t_alias_query_merge) -{ - size_t n=1000; - auto buffer = create_weighted_mbuffer<WRec>(n); - - Shard* shard = new Shard(buffer); - - uint64_t lower_key = 0; - uint64_t upper_key = 5; - - size_t k = 1000; - - size_t cnt[3] = {0}; - wss::Parms<WRec> parms = {k}; - parms.rng = gsl_rng_alloc(gsl_rng_mt19937); - - std::vector<std::vector<Wrapped<WRec>>> results(2); - - for (size_t i=0; i<1000; i++) { - auto state1 = wss::Query<Shard, WRec>::get_query_state(shard, &parms); - ((wss::State<WRec> *) state1)->sample_size = k; - results[0] = wss::Query<Shard, WRec>::query(shard, state1, &parms); - - auto state2 = wss::Query<Shard, WRec>::get_query_state(shard, &parms); - ((wss::State<WRec> *) state2)->sample_size = k; - results[1] = wss::Query<Shard, WRec>::query(shard, state2, &parms); - - wss::Query<Shard, WRec>::delete_query_state(state1); - wss::Query<Shard, WRec>::delete_query_state(state2); - } - - auto merged = wss::Query<Shard, WRec>::merge(results, nullptr); - - ck_assert_int_eq(merged.size(), 2*k); - for (size_t i=0; i<merged.size(); i++) { - ck_assert_int_ge(merged[i].key, lower_key); - ck_assert_int_le(merged[i].key, upper_key); - } - - gsl_rng_free(parms.rng); - delete shard; - delete buffer; -} -END_TEST - - -START_TEST(t_alias_buffer_query_scan) -{ - size_t n=1000; - auto buffer = create_weighted_mbuffer<WRec>(n); - - uint64_t lower_key = 0; - uint64_t upper_key = 5; - - size_t k = 1000; - - size_t cnt[3] = {0}; - wss::Parms<WRec> parms = {k}; - parms.rng = gsl_rng_alloc(gsl_rng_mt19937); - - size_t total_samples = 0; - - for (size_t i=0; i<1000; i++) { - auto state = wss::Query<Shard, WRec, false>::get_buffer_query_state(buffer, &parms); - ((wss::BufferState<WRec> *) state)->sample_size = k; - auto result = wss::Query<Shard, WRec, false>::buffer_query(buffer, state, &parms); - total_samples += result.size(); - - for (size_t j=0; j<result.size(); j++) { - cnt[result[j].rec.key - 1]++; - } - - wss::Query<Shard, WRec, false>::delete_buffer_query_state(state); - } - - ck_assert(roughly_equal(cnt[0], (double) total_samples/4.0, total_samples, .05)); - ck_assert(roughly_equal(cnt[1], (double) total_samples/4.0, total_samples, .05)); - ck_assert(roughly_equal(cnt[2], (double) total_samples/2.0, total_samples, .05)); - - gsl_rng_free(parms.rng); - delete buffer; -} -END_TEST - - -START_TEST(t_alias_buffer_query_rejection) -{ - size_t n=1000; - auto buffer = create_weighted_mbuffer<WRec>(n); - - uint64_t lower_key = 0; - uint64_t upper_key = 5; - - size_t k = 1000; - - size_t cnt[3] = {0}; - wss::Parms<WRec> parms = {k}; - parms.rng = gsl_rng_alloc(gsl_rng_mt19937); - - size_t total_samples = 0; - - for (size_t i=0; i<1000; i++) { - auto state = wss::Query<Shard, WRec>::get_buffer_query_state(buffer, &parms); - ((wss::BufferState<WRec> *) state)->sample_size = k; - auto result = wss::Query<Shard, WRec>::buffer_query(buffer, state, &parms); - - total_samples += result.size(); - - for (size_t j=0; j<result.size(); j++) { - cnt[result[j].rec.key - 1]++; - } - - wss::Query<Shard, WRec>::delete_buffer_query_state(state); - } - - ck_assert(roughly_equal(cnt[0], (double) total_samples/4.0, total_samples, .1)); - ck_assert(roughly_equal(cnt[1], (double) total_samples/4.0, total_samples, .1)); - ck_assert(roughly_equal(cnt[2], (double) total_samples/2.0, total_samples, .1)); - - gsl_rng_free(parms.rng); - delete buffer; -} -END_TEST +#include "include/shard_standard.h" +#include "include/rangequery.h" Suite *unit_testing() { - Suite *unit = suite_create("Alias Shard Unit Testing"); - - TCase *create = tcase_create("de::Alias constructor Testing"); - tcase_add_test(create, t_mbuffer_init); - tcase_add_test(create, t_alias_init); - tcase_set_timeout(create, 100); - suite_add_tcase(unit, create); - + Suite *unit = suite_create("ISAMTree Shard Unit Testing"); - TCase *tombstone = tcase_create("de:Alias::tombstone cancellation Testing"); - tcase_add_test(tombstone, t_full_cancelation); - suite_add_tcase(unit, tombstone); - - - TCase *lookup = tcase_create("de:Alias:point_lookup Testing"); - tcase_add_test(lookup, t_point_lookup); - tcase_add_test(lookup, t_point_lookup_miss); - suite_add_tcase(unit, lookup); - - - - TCase *sampling = tcase_create("de:Alias::AliasQuery Testing"); - tcase_add_test(sampling, t_alias_query); - tcase_add_test(sampling, t_alias_query_merge); - tcase_add_test(sampling, t_alias_buffer_query_rejection); - tcase_add_test(sampling, t_alias_buffer_query_scan); - suite_add_tcase(unit, sampling); + inject_rangequery_tests(unit); + inject_shard_tests(unit); return unit; } @@ -389,3 +58,4 @@ int main() return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; } + diff --git a/tests/augbtree_tests.cpp b/tests/augbtree_tests.cpp index 1b24344..c7a0885 100644 --- a/tests/augbtree_tests.cpp +++ b/tests/augbtree_tests.cpp @@ -1,7 +1,7 @@ /* - * tests/augbtree_tests.cpp + * tests/isam_tests.cpp * - * Unit tests for AugBTree (Augmented B+Tree) shard + * Unit tests for ISAM Tree shard * * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> * Dong Xie <dongx@psu.edu> @@ -11,363 +11,23 @@ */ #include "shard/AugBTree.h" -#include "query/wirs.h" -#include "testing.h" - +#include "include/testing.h" #include <check.h> using namespace de; -typedef AugBTree<WRec> Shard; - -START_TEST(t_mbuffer_init) -{ - auto buffer = new MutableBuffer<WRec>(1024, 1024); - for (uint64_t i = 512; i > 0; i--) { - uint32_t v = i; - buffer->append({i,v, 1}); - } - - for (uint64_t i = 1; i <= 256; ++i) { - uint32_t v = i; - buffer->append({i, v, 1}, true); - } - - for (uint64_t i = 257; i <= 512; ++i) { - uint32_t v = i + 1; - buffer->append({i, v, 1}); - } - - Shard* shard = new Shard(buffer); - ck_assert_uint_eq(shard->get_record_count(), 512); - - delete buffer; - delete shard; -} - - -START_TEST(t_wirs_init) -{ - size_t n = 512; - auto mbuffer1 = create_test_mbuffer<WRec>(n); - auto mbuffer2 = create_test_mbuffer<WRec>(n); - auto mbuffer3 = create_test_mbuffer<WRec>(n); - - auto shard1 = new Shard(mbuffer1); - auto shard2 = new Shard(mbuffer2); - auto shard3 = new Shard(mbuffer3); - - Shard* shards[3] = {shard1, shard2, shard3}; - auto shard4 = new Shard(shards, 3); - - ck_assert_int_eq(shard4->get_record_count(), n * 3); - ck_assert_int_eq(shard4->get_tombstone_count(), 0); - - size_t total_cnt = 0; - size_t shard1_idx = 0; - size_t shard2_idx = 0; - size_t shard3_idx = 0; - - for (size_t i = 0; i < shard4->get_record_count(); ++i) { - auto rec1 = shard1->get_record_at(shard1_idx); - auto rec2 = shard2->get_record_at(shard2_idx); - auto rec3 = shard3->get_record_at(shard3_idx); - - auto cur_rec = shard4->get_record_at(i); - - if (shard1_idx < n && cur_rec->rec == rec1->rec) { - ++shard1_idx; - } else if (shard2_idx < n && cur_rec->rec == rec2->rec) { - ++shard2_idx; - } else if (shard3_idx < n && cur_rec->rec == rec3->rec) { - ++shard3_idx; - } else { - assert(false); - } - } - - delete mbuffer1; - delete mbuffer2; - delete mbuffer3; - - delete shard1; - delete shard2; - delete shard3; - delete shard4; -} - - -START_TEST(t_point_lookup) -{ - size_t n = 10000; - - auto buffer = create_double_seq_mbuffer<WRec>(n, false); - auto wirs = Shard(buffer); - - for (size_t i=0; i<n; i++) { - WRec r; - auto rec = (buffer->get_data() + i); - r.key = rec->rec.key; - r.value = rec->rec.value; - - auto result = wirs.point_lookup(r); - ck_assert_ptr_nonnull(result); - ck_assert_int_eq(result->rec.key, r.key); - ck_assert_int_eq(result->rec.value, r.value); - } - - delete buffer; -} -END_TEST - - -START_TEST(t_point_lookup_miss) -{ - size_t n = 10000; - - auto buffer = create_double_seq_mbuffer<WRec>(n, false); - auto wirs = Shard(buffer); - - for (size_t i=n + 100; i<2*n; i++) { - WRec r; - r.key = i; - r.value = i; - - auto result = wirs.point_lookup(r); - ck_assert_ptr_null(result); - } - - delete buffer; -} - - -START_TEST(t_full_cancelation) -{ - size_t n = 100; - auto buffer = create_double_seq_mbuffer<WRec>(n, false); - auto buffer_ts = create_double_seq_mbuffer<WRec>(n, true); - - Shard* shard = new Shard(buffer); - Shard* shard_ts = new Shard(buffer_ts); - - ck_assert_int_eq(shard->get_record_count(), n); - ck_assert_int_eq(shard->get_tombstone_count(), 0); - ck_assert_int_eq(shard_ts->get_record_count(), n); - ck_assert_int_eq(shard_ts->get_tombstone_count(), n); - - Shard* shards[] = {shard, shard_ts}; - - Shard* merged = new Shard(shards, 2); - - ck_assert_int_eq(merged->get_tombstone_count(), 0); - ck_assert_int_eq(merged->get_record_count(), 0); - - delete buffer; - delete buffer_ts; - delete shard; - delete shard_ts; - delete merged; -} -END_TEST - - -START_TEST(t_wirs_query) -{ - size_t n=1000; - auto buffer = create_weighted_mbuffer<WRec>(n); - - Shard* shard = new Shard(buffer); - - uint64_t lower_key = 0; - uint64_t upper_key = 5; - - size_t k = 1000; - - size_t cnt[3] = {0}; - wirs::Parms<WRec> parms = {lower_key, upper_key, k}; - parms.rng = gsl_rng_alloc(gsl_rng_mt19937); - - size_t total_samples = 0; - - for (size_t i=0; i<1000; i++) { - auto state = wirs::Query<Shard, WRec>::get_query_state(shard, &parms); - ((wirs::State<WRec> *) state)->sample_size = k; - auto result = wirs::Query<Shard, WRec>::query(shard, state, &parms); - - total_samples += result.size(); - - for (size_t j=0; j<result.size(); j++) { - cnt[result[j].rec.key - 1]++; - } - - wirs::Query<Shard, WRec>::delete_query_state(state); - } - - ck_assert(roughly_equal(cnt[0], (double) total_samples/4.0, total_samples, .05)); - ck_assert(roughly_equal(cnt[1], (double) total_samples/4.0, total_samples, .05)); - ck_assert(roughly_equal(cnt[2], (double) total_samples/2.0, total_samples, .05)); - - gsl_rng_free(parms.rng); - delete shard; - delete buffer; -} -END_TEST - - -START_TEST(t_wirs_query_merge) -{ - size_t n=1000; - auto buffer = create_weighted_mbuffer<WRec>(n); - - Shard* shard = new Shard(buffer); - - uint64_t lower_key = 0; - uint64_t upper_key = 5; - - size_t k = 1000; - - size_t cnt[3] = {0}; - wirs::Parms<WRec> parms = {lower_key, upper_key, k}; - parms.rng = gsl_rng_alloc(gsl_rng_mt19937); - - std::vector<std::vector<Wrapped<WRec>>> results(2); - - for (size_t i=0; i<1000; i++) { - auto state1 = wirs::Query<Shard, WRec>::get_query_state(shard, &parms); - ((wirs::State<WRec> *) state1)->sample_size = k; - results[0] = wirs::Query<Shard, WRec>::query(shard, state1, &parms); - - auto state2 = wirs::Query<Shard, WRec>::get_query_state(shard, &parms); - ((wirs::State<WRec> *) state2)->sample_size = k; - results[1] = wirs::Query<Shard, WRec>::query(shard, state2, &parms); - - wirs::Query<Shard, WRec>::delete_query_state(state1); - wirs::Query<Shard, WRec>::delete_query_state(state2); - } - - auto merged = wirs::Query<Shard, WRec>::merge(results, nullptr); - - ck_assert_int_eq(merged.size(), 2*k); - for (size_t i=0; i<merged.size(); i++) { - ck_assert_int_ge(merged[i].key, lower_key); - ck_assert_int_le(merged[i].key, upper_key); - } - - gsl_rng_free(parms.rng); - delete shard; - delete buffer; -} -END_TEST - - -START_TEST(t_wirs_buffer_query_scan) -{ - size_t n=1000; - auto buffer = create_weighted_mbuffer<WRec>(n); - - uint64_t lower_key = 0; - uint64_t upper_key = 5; - - size_t k = 1000; - - size_t cnt[3] = {0}; - wirs::Parms<WRec> parms = {lower_key, upper_key, k}; - parms.rng = gsl_rng_alloc(gsl_rng_mt19937); - - size_t total_samples = 0; - - for (size_t i=0; i<1000; i++) { - auto state = wirs::Query<Shard, WRec, false>::get_buffer_query_state(buffer, &parms); - ((wirs::BufferState<WRec> *) state)->sample_size = k; - auto result = wirs::Query<Shard, WRec, false>::buffer_query(buffer, state, &parms); - - total_samples += result.size(); - - for (size_t j=0; j<result.size(); j++) { - cnt[result[j].rec.key - 1]++; - } - - wirs::Query<Shard, WRec, false>::delete_buffer_query_state(state); - } - - ck_assert(roughly_equal(cnt[0], (double) total_samples/4.0, total_samples, .05)); - ck_assert(roughly_equal(cnt[1], (double) total_samples/4.0, total_samples, .05)); - ck_assert(roughly_equal(cnt[2], (double) total_samples/2.0, total_samples, .05)); - - gsl_rng_free(parms.rng); - delete buffer; -} -END_TEST - - -START_TEST(t_wirs_buffer_query_rejection) -{ - size_t n=1000; - auto buffer = create_weighted_mbuffer<WRec>(n); - - uint64_t lower_key = 0; - uint64_t upper_key = 5; - - size_t k = 1000; - - size_t cnt[3] = {0}; - wirs::Parms<WRec> parms = {lower_key, upper_key, k}; - parms.rng = gsl_rng_alloc(gsl_rng_mt19937); - - size_t total_samples = 0; - - for (size_t i=0; i<1000; i++) { - auto state = wirs::Query<Shard, WRec>::get_buffer_query_state(buffer, &parms); - ((wirs::BufferState<WRec> *) state)->sample_size = k; - auto result = wirs::Query<Shard, WRec>::buffer_query(buffer, state, &parms); - - total_samples += result.size(); - - for (size_t j=0; j<result.size(); j++) { - cnt[result[j].rec.key - 1]++; - } - - wirs::Query<Shard, WRec>::delete_buffer_query_state(state); - } - - ck_assert(roughly_equal(cnt[0], (double) total_samples/4.0, total_samples, .05)); - ck_assert(roughly_equal(cnt[1], (double) total_samples/4.0, total_samples, .05)); - ck_assert(roughly_equal(cnt[2], (double) total_samples/2.0, total_samples, .05)); - - gsl_rng_free(parms.rng); - delete buffer; -} -END_TEST +typedef WRec R; +typedef AugBTree<R> Shard; +#include "include/shard_standard.h" +#include "include/rangequery.h" Suite *unit_testing() { - Suite *unit = suite_create("AugBTree Shard Unit Testing"); - - TCase *create = tcase_create("de::AugBTree constructor Testing"); - tcase_add_test(create, t_mbuffer_init); - tcase_add_test(create, t_wirs_init); - tcase_set_timeout(create, 100); - suite_add_tcase(unit, create); - - - TCase *tombstone = tcase_create("de:AugBTree::tombstone cancellation Testing"); - tcase_add_test(tombstone, t_full_cancelation); - suite_add_tcase(unit, tombstone); - - - TCase *lookup = tcase_create("de:AugBTree:point_lookup Testing"); - tcase_add_test(lookup, t_point_lookup); - tcase_add_test(lookup, t_point_lookup_miss); - suite_add_tcase(unit, lookup); - + Suite *unit = suite_create("Alias-augmented B+Tree Shard Unit Testing"); - TCase *sampling = tcase_create("de:AugBTree::AugBTreeQuery Testing"); - tcase_add_test(sampling, t_wirs_query); - tcase_add_test(sampling, t_wirs_query_merge); - tcase_add_test(sampling, t_wirs_buffer_query_rejection); - tcase_add_test(sampling, t_wirs_buffer_query_scan); - suite_add_tcase(unit, sampling); + inject_rangequery_tests(unit); + inject_shard_tests(unit); return unit; } diff --git a/tests/de_level_concurrent.cpp b/tests/de_level_concurrent.cpp index 40605c4..2039efb 100644 --- a/tests/de_level_concurrent.cpp +++ b/tests/de_level_concurrent.cpp @@ -17,12 +17,12 @@ #include "framework/DynamicExtension.h" #include "shard/ISAMTree.h" #include "query/rangequery.h" -#include "shard/TrieSpline.h" #include <check.h> using namespace de; -typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<Rec, ISAMTree<Rec>>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; +typedef Rec R; +typedef DynamicExtension<R, ISAMTree<R>, rq::Query<R, ISAMTree<R>>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; #include "include/concurrent_extension.h" diff --git a/tests/de_level_tag.cpp b/tests/de_level_tag.cpp index 2ff2d26..75131c4 100644 --- a/tests/de_level_tag.cpp +++ b/tests/de_level_tag.cpp @@ -21,7 +21,8 @@ #include <check.h> using namespace de; -typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<Rec, ISAMTree<Rec>>, LayoutPolicy::LEVELING, DeletePolicy::TAGGING, SerialScheduler> DE; +typedef Rec R; +typedef DynamicExtension<R, ISAMTree<R>, rq::Query<R, ISAMTree<R>>, LayoutPolicy::LEVELING, DeletePolicy::TAGGING, SerialScheduler> DE; #include "include/dynamic_extension.h" diff --git a/tests/de_level_tomb.cpp b/tests/de_level_tomb.cpp index 9b30ac0..6da211d 100644 --- a/tests/de_level_tomb.cpp +++ b/tests/de_level_tomb.cpp @@ -22,7 +22,8 @@ #include <check.h> using namespace de; -typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<Rec, ISAMTree<Rec>>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, SerialScheduler> DE; +typedef Rec R; +typedef DynamicExtension<R, ISAMTree<R>, rq::Query<R, ISAMTree<R>>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, SerialScheduler> DE; #include "include/dynamic_extension.h" diff --git a/tests/de_tier_concurrent.cpp b/tests/de_tier_concurrent.cpp index 418332b..722b9bd 100644 --- a/tests/de_tier_concurrent.cpp +++ b/tests/de_tier_concurrent.cpp @@ -21,7 +21,8 @@ #include <check.h> using namespace de; -typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<Rec, ISAMTree<Rec>>, LayoutPolicy::TEIRING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; +typedef Rec R; +typedef DynamicExtension<R, ISAMTree<R>, rq::Query<R, ISAMTree<R>>, LayoutPolicy::TEIRING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; #include "include/concurrent_extension.h" diff --git a/tests/de_tier_tag.cpp b/tests/de_tier_tag.cpp index 83c37af..79bb7bf 100644 --- a/tests/de_tier_tag.cpp +++ b/tests/de_tier_tag.cpp @@ -22,7 +22,8 @@ #include <check.h> using namespace de; -typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<Rec, ISAMTree<Rec>>, LayoutPolicy::TEIRING, DeletePolicy::TAGGING, SerialScheduler> DE; +typedef Rec R; +typedef DynamicExtension<R, ISAMTree<R>, rq::Query<R, ISAMTree<R>>, LayoutPolicy::TEIRING, DeletePolicy::TAGGING, SerialScheduler> DE; #include "include/dynamic_extension.h" diff --git a/tests/de_tier_tomb.cpp b/tests/de_tier_tomb.cpp index 58a7a0f..b1387bb 100644 --- a/tests/de_tier_tomb.cpp +++ b/tests/de_tier_tomb.cpp @@ -22,7 +22,8 @@ #include <check.h> using namespace de; -typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<Rec, ISAMTree<Rec>>, LayoutPolicy::TEIRING, DeletePolicy::TOMBSTONE, SerialScheduler> DE; +typedef Rec R; +typedef DynamicExtension<Rec, ISAMTree<R>, rq::Query<R, ISAMTree<R>>, LayoutPolicy::TEIRING, DeletePolicy::TOMBSTONE, SerialScheduler> DE; #include "include/dynamic_extension.h" diff --git a/tests/include/concurrent_extension.h b/tests/include/concurrent_extension.h index 24cb2ce..0993fac 100644 --- a/tests/include/concurrent_extension.h +++ b/tests/include/concurrent_extension.h @@ -8,8 +8,8 @@ * Distributed under the Modified BSD License. * * WARNING: This file must be included in the main unit test set - * after the definition of an appropriate Shard, Query, and Rec - * type. In particular, Rec needs to implement the key-value + * after the definition of an appropriate Shard, Query, and R + * type. In particular, R needs to implement the key-value * pair interface. For other types of record, you'll need to * use a different set of unit tests. */ @@ -30,7 +30,7 @@ #include <check.h> //using namespace de; -//typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<ISAMTree<Rec>, Rec>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; +//typedef DynamicExtension<R, ISAMTree<R>, rq::Query<ISAMTree<R>, R>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; */ @@ -54,7 +54,7 @@ START_TEST(t_insert) uint64_t key = 0; uint32_t val = 0; for (size_t i=0; i<100; i++) { - Rec r = {key, val}; + R r = {key, val}; ck_assert_int_eq(test_de->insert(r), 1); key++; val++; @@ -75,7 +75,7 @@ START_TEST(t_debug_insert) uint64_t key = 0; uint32_t val = 0; for (size_t i=0; i<1000; i++) { - Rec r = {key, val}; + R r = {key, val}; ck_assert_int_eq(test_de->insert(r), 1); ck_assert_int_eq(test_de->get_record_count(), i+1); key++; @@ -94,7 +94,7 @@ START_TEST(t_insert_with_mem_merges) uint64_t key = 0; uint32_t val = 0; - Rec r = {key, val}; + R r = {key, val}; for (size_t i=0; i<1000; i++) { ck_assert_int_eq(test_de->insert(r), 1); r.key++; @@ -148,7 +148,7 @@ START_TEST(t_range_query) size_t i=0; while ( i < keys.size()) { - Rec r = {keys[i], (uint32_t) i}; + R r = {keys[i], (uint32_t) i}; if (test_de->insert(r)) { i++; } else { @@ -166,7 +166,7 @@ START_TEST(t_range_query) uint64_t lower_key = keys[idx]; uint64_t upper_key = keys[idx + 250]; - rq::Parms<Rec> p; + rq::Parms<R> p; p.lower_bound = lower_key; p.upper_bound = upper_key; @@ -210,7 +210,7 @@ START_TEST(t_tombstone_merging_01) size_t deletes = 0; size_t cnt=0; for (auto rec : records) { - Rec r = {rec.first, rec.second}; + R r = {rec.first, rec.second}; while (!test_de->insert(r)) { _mm_pause(); } @@ -220,7 +220,7 @@ START_TEST(t_tombstone_merging_01) std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); for (size_t i=0; i<del_vec.size(); i++) { - Rec dr = {del_vec[i].first, del_vec[i].second}; + R dr = {del_vec[i].first, del_vec[i].second}; while (!test_de->erase(dr)) { _mm_pause(); } @@ -249,9 +249,9 @@ DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) { auto test_de = new DE(1000, 10000, 2); - std::set<Rec> records; - std::set<Rec> to_delete; - std::set<Rec> deleted; + std::set<R> records; + std::set<R> to_delete; + std::set<R> deleted; while (records.size() < reccnt) { uint64_t key = rand(); @@ -267,7 +267,7 @@ DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) { ck_assert_int_eq(test_de->insert(rec), 1); if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) { - std::vector<Rec> del_vec; + std::vector<R> del_vec; std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); for (size_t i=0; i<del_vec.size(); i++) { @@ -295,9 +295,9 @@ START_TEST(t_static_structure) size_t reccnt = 100000; auto test_de = new DE(100, 1000, 2); - std::set<Rec> records; - std::set<Rec> to_delete; - std::set<Rec> deleted; + std::set<R> records; + std::set<R> to_delete; + std::set<R> deleted; while (records.size() < reccnt) { uint64_t key = rand(); @@ -319,7 +319,7 @@ START_TEST(t_static_structure) t_reccnt++; if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) { - std::vector<Rec> del_vec; + std::vector<R> del_vec; std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); for (size_t i=0; i<del_vec.size(); i++) { @@ -339,11 +339,11 @@ START_TEST(t_static_structure) } - //fprintf(stderr, "Tombstones: %ld\tRecords: %ld\n", test_de->get_tombstone_count(), test_de->get_record_count()); + //fprintf(stderr, "Tombstones: %ld\tRords: %ld\n", test_de->get_tombstone_count(), test_de->get_record_count()); //fprintf(stderr, "Inserts: %ld\tDeletes:%ld\tNet:%ld\n", reccnt, deletes, reccnt - deletes); auto flat = test_de->create_static_structure(true); - //fprintf(stderr, "Flat: Tombstones: %ld\tRecords %ld\n", flat->get_tombstone_count(), flat->get_record_count()); + //fprintf(stderr, "Flat: Tombstones: %ld\tRords %ld\n", flat->get_tombstone_count(), flat->get_record_count()); //ck_assert_int_eq(flat->get_record_count(), reccnt - deletes); uint64_t prev_key = 0; diff --git a/tests/include/dynamic_extension.h b/tests/include/dynamic_extension.h index 5a08f5a..f0f13dd 100644 --- a/tests/include/dynamic_extension.h +++ b/tests/include/dynamic_extension.h @@ -8,8 +8,8 @@ * Distributed under the Modified BSD License. * * WARNING: This file must be included in the main unit test set - * after the definition of an appropriate Shard, Query, and Rec - * type. In particular, Rec needs to implement the key-value + * after the definition of an appropriate Shard, Query, and R + * type. In particular, R needs to implement the key-value * pair interface. For other types of record, you'll need to * use a different set of unit tests. */ @@ -29,7 +29,7 @@ //#include "query/rangequery.h" //#include <check.h> //using namespace de; -//typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<ISAMTree<Rec>, Rec>, LayoutPolicy::TEIRING, DeletePolicy::TAGGING, SerialScheduler> DE; +//typedef DynamicExtension<R, ISAMTree<R>, rq::Query<ISAMTree<R>, R>, LayoutPolicy::TEIRING, DeletePolicy::TAGGING, SerialScheduler> DE; START_TEST(t_create) @@ -52,7 +52,7 @@ START_TEST(t_insert) uint64_t key = 0; uint32_t val = 0; for (size_t i=0; i<100; i++) { - Rec r = {key, val}; + R r = {key, val}; ck_assert_int_eq(test_de->insert(r), 1); key++; val++; @@ -73,7 +73,7 @@ START_TEST(t_debug_insert) uint64_t key = 0; uint32_t val = 0; for (size_t i=0; i<1000; i++) { - Rec r = {key, val}; + R r = {key, val}; ck_assert_int_eq(test_de->insert(r), 1); ck_assert_int_eq(test_de->get_record_count(), i+1); key++; @@ -92,7 +92,7 @@ START_TEST(t_insert_with_mem_merges) uint64_t key = 0; uint32_t val = 0; for (size_t i=0; i<300; i++) { - Rec r = {key, val}; + R r = {key, val}; ck_assert_int_eq(test_de->insert(r), 1); key++; val++; @@ -123,7 +123,7 @@ START_TEST(t_range_query) std::shuffle(keys.begin(), keys.end(), gen); for (size_t i=0; i<keys.size(); i++) { - Rec r = {keys[i], (uint32_t) i}; + R r = {keys[i], (uint32_t) i}; ck_assert_int_eq(test_de->insert(r), 1); } @@ -136,7 +136,7 @@ START_TEST(t_range_query) uint64_t lower_key = keys[idx]; uint64_t upper_key = keys[idx + 250]; - rq::Parms<Rec> p; + rq::Parms<R> p; p.lower_bound = lower_key; p.upper_bound = upper_key; @@ -177,7 +177,7 @@ START_TEST(t_tombstone_merging_01) size_t deletes = 0; size_t cnt=0; for (auto rec : records) { - Rec r = {rec.first, rec.second}; + R r = {rec.first, rec.second}; ck_assert_int_eq(test_de->insert(r), 1); if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) { @@ -185,7 +185,7 @@ START_TEST(t_tombstone_merging_01) std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); for (size_t i=0; i<del_vec.size(); i++) { - Rec dr = {del_vec[i].first, del_vec[i].second}; + R dr = {del_vec[i].first, del_vec[i].second}; test_de->erase(dr); deletes++; to_delete.erase(del_vec[i]); @@ -212,9 +212,9 @@ DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) { auto test_de = new DE(1000, 10000, 2); - std::set<Rec> records; - std::set<Rec> to_delete; - std::set<Rec> deleted; + std::set<R> records; + std::set<R> to_delete; + std::set<R> deleted; while (records.size() < reccnt) { uint64_t key = rand(); @@ -230,7 +230,7 @@ DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) { ck_assert_int_eq(test_de->insert(rec), 1); if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) { - std::vector<Rec> del_vec; + std::vector<R> del_vec; std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); for (size_t i=0; i<del_vec.size(); i++) { @@ -258,9 +258,9 @@ START_TEST(t_static_structure) size_t reccnt = 100000; auto test_de = new DE(100, 1000, 2); - std::set<Rec> records; - std::set<Rec> to_delete; - std::set<Rec> deleted; + std::set<R> records; + std::set<R> to_delete; + std::set<R> deleted; while (records.size() < reccnt) { uint64_t key = rand(); @@ -280,7 +280,7 @@ START_TEST(t_static_structure) t_reccnt++; if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) { - std::vector<Rec> del_vec; + std::vector<R> del_vec; std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); for (size_t i=0; i<del_vec.size(); i++) { diff --git a/tests/include/rangecount.h b/tests/include/rangecount.h index 471af27..fdd66d9 100644 --- a/tests/include/rangecount.h +++ b/tests/include/rangecount.h @@ -9,8 +9,8 @@ * Distributed under the Modified BSD License. * * WARNING: This file must be included in the main unit test set - * after the definition of an appropriate Shard and Rec - * type. In particular, Rec needs to implement the key-value + * after the definition of an appropriate Shard and R + * type. In particular, R needs to implement the key-value * pair interface and Shard needs to support lower_bound. * For other types of record and shard, you'll need to * use a different set of unit tests. @@ -29,21 +29,24 @@ //#include "testing.h" //#include <check.h> //using namespace de; -//typedef ISAMTree<Rec> Shard; +//typedef ISAMTree<R> Shard; + + +#include "query/rangecount.h" START_TEST(t_range_count) { - auto buffer = create_sequential_mbuffer<Rec>(100, 1000); + auto buffer = create_sequential_mbuffer<R>(100, 1000); auto shard = Shard(buffer->get_buffer_view()); - rc::Parms<Rec> parms; + rc::Parms<R> parms; parms.lower_bound = 300; parms.upper_bound = 500; - auto state = rc::Query<Rec, Shard>::get_query_state(&shard, &parms); - auto result = rc::Query<Rec, Shard>::query(&shard, state, &parms); - rc::Query<Rec, Shard>::delete_query_state(state); + auto state = rc::Query<R, Shard>::get_query_state(&shard, &parms); + auto result = rc::Query<R, Shard>::query(&shard, state, &parms); + rc::Query<R, Shard>::delete_query_state(state); ck_assert_int_eq(result.size(), 1); ck_assert_int_eq(result[0].rec.key, parms.upper_bound - parms.lower_bound + 1); @@ -55,17 +58,17 @@ END_TEST START_TEST(t_buffer_range_count) { - auto buffer = create_sequential_mbuffer<Rec>(100, 1000); + auto buffer = create_sequential_mbuffer<R>(100, 1000); - rc::Parms<Rec> parms; + rc::Parms<R> parms; parms.lower_bound = 300; parms.upper_bound = 500; { auto view = buffer->get_buffer_view(); - auto state = rc::Query<Rec, Shard>::get_buffer_query_state(&view, &parms); - auto result = rc::Query<Rec, Shard>::buffer_query(state, &parms); - rc::Query<Rec, Shard>::delete_buffer_query_state(state); + auto state = rc::Query<R, Shard>::get_buffer_query_state(&view, &parms); + auto result = rc::Query<R, Shard>::buffer_query(state, &parms); + rc::Query<R, Shard>::delete_buffer_query_state(state); ck_assert_int_eq(result.size(), 1); ck_assert_int_eq(result[0].rec.key, parms.upper_bound - parms.lower_bound + 1); @@ -78,32 +81,32 @@ END_TEST START_TEST(t_range_count_merge) { - auto buffer1 = create_sequential_mbuffer<Rec>(100, 200); - auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000); + auto buffer1 = create_sequential_mbuffer<R>(100, 200); + auto buffer2 = create_sequential_mbuffer<R>(400, 1000); auto shard1 = Shard(buffer1->get_buffer_view()); auto shard2 = Shard(buffer2->get_buffer_view()); - rc::Parms<Rec> parms; + rc::Parms<R> parms; parms.lower_bound = 150; parms.upper_bound = 500; size_t result_size = parms.upper_bound - parms.lower_bound + 1 - 200; - auto state1 = rc::Query<Rec, Shard>::get_query_state(&shard1, &parms); - auto state2 = rc::Query<Rec, Shard>::get_query_state(&shard2, &parms); + auto state1 = rc::Query<R, Shard>::get_query_state(&shard1, &parms); + auto state2 = rc::Query<R, Shard>::get_query_state(&shard2, &parms); - std::vector<std::vector<de::Wrapped<Rec>>> results(2); - results[0] = rc::Query<Rec, Shard>::query(&shard1, state1, &parms); - results[1] = rc::Query<Rec, Shard>::query(&shard2, state2, &parms); + std::vector<std::vector<de::Wrapped<R>>> results(2); + results[0] = rc::Query<R, Shard>::query(&shard1, state1, &parms); + results[1] = rc::Query<R, Shard>::query(&shard2, state2, &parms); - rc::Query<Rec, Shard>::delete_query_state(state1); - rc::Query<Rec, Shard>::delete_query_state(state2); + rc::Query<R, Shard>::delete_query_state(state1); + rc::Query<R, Shard>::delete_query_state(state2); ck_assert_int_eq(results[0].size(), 1); ck_assert_int_eq(results[1].size(), 1); - auto result = rc::Query<Rec, Shard>::merge(results, nullptr); + auto result = rc::Query<R, Shard>::merge(results, nullptr); ck_assert_int_eq(result[0].key, result_size); @@ -115,8 +118,8 @@ END_TEST START_TEST(t_lower_bound) { - auto buffer1 = create_sequential_mbuffer<Rec>(100, 200); - auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000); + auto buffer1 = create_sequential_mbuffer<R>(100, 200); + auto buffer2 = create_sequential_mbuffer<R>(400, 1000); auto shard1 = new Shard(buffer1->get_buffer_view()); auto shard2 = new Shard(buffer2->get_buffer_view()); @@ -126,7 +129,7 @@ START_TEST(t_lower_bound) auto merged = Shard(shards); for (size_t i=100; i<1000; i++) { - Rec r; + R r; r.key = i; r.value = i; diff --git a/tests/include/rangequery.h b/tests/include/rangequery.h index dbb71db..a8a73f7 100644 --- a/tests/include/rangequery.h +++ b/tests/include/rangequery.h @@ -9,8 +9,8 @@ * Distributed under the Modified BSD License. * * WARNING: This file must be included in the main unit test set - * after the definition of an appropriate Shard and Rec - * type. In particular, Rec needs to implement the key-value + * after the definition of an appropriate Shard and R + * type. In particular, R needs to implement the key-value * pair interface and Shard needs to support lower_bound. * For other types of record and shard, you'll need to * use a different set of unit tests. @@ -29,21 +29,23 @@ //#include "testing.h" //#include <check.h> //using namespace de; -//typedef ISAMTree<Rec> Shard; +//typedef ISAMTree<R> Shard; + +#include "query/rangequery.h" START_TEST(t_range_query) { - auto buffer = create_sequential_mbuffer<Rec>(100, 1000); + auto buffer = create_sequential_mbuffer<R>(100, 1000); auto shard = Shard(buffer->get_buffer_view()); - rq::Parms<Rec> parms; + rq::Parms<R> parms; parms.lower_bound = 300; parms.upper_bound = 500; - auto state = rq::Query<Rec, Shard>::get_query_state(&shard, &parms); - auto result = rq::Query<Rec, Shard>::query(&shard, state, &parms); - rq::Query<Rec, Shard>::delete_query_state(state); + auto state = rq::Query<R, Shard>::get_query_state(&shard, &parms); + auto result = rq::Query<R, Shard>::query(&shard, state, &parms); + rq::Query<R, Shard>::delete_query_state(state); ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1); for (size_t i=0; i<result.size(); i++) { @@ -58,17 +60,17 @@ END_TEST START_TEST(t_buffer_range_query) { - auto buffer = create_sequential_mbuffer<Rec>(100, 1000); + auto buffer = create_sequential_mbuffer<R>(100, 1000); - rq::Parms<Rec> parms; + rq::Parms<R> parms; parms.lower_bound = 300; parms.upper_bound = 500; { auto view = buffer->get_buffer_view(); - auto state = rq::Query<Rec, Shard>::get_buffer_query_state(&view, &parms); - auto result = rq::Query<Rec, Shard>::buffer_query(state, &parms); - rq::Query<Rec, Shard>::delete_buffer_query_state(state); + auto state = rq::Query<R, Shard>::get_buffer_query_state(&view, &parms); + auto result = rq::Query<R, Shard>::buffer_query(state, &parms); + rq::Query<R, Shard>::delete_buffer_query_state(state); ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1); for (size_t i=0; i<result.size(); i++) { @@ -84,40 +86,40 @@ END_TEST START_TEST(t_range_query_merge) { - auto buffer1 = create_sequential_mbuffer<Rec>(100, 200); - auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000); + auto buffer1 = create_sequential_mbuffer<R>(100, 200); + auto buffer2 = create_sequential_mbuffer<R>(400, 1000); auto shard1 = Shard(buffer1->get_buffer_view()); auto shard2 = Shard(buffer2->get_buffer_view()); - rq::Parms<Rec> parms; + rq::Parms<R> parms; parms.lower_bound = 150; parms.upper_bound = 500; size_t result_size = parms.upper_bound - parms.lower_bound + 1 - 200; - auto state1 = rq::Query<Rec, Shard>::get_query_state(&shard1, &parms); - auto state2 = rq::Query<Rec, Shard>::get_query_state(&shard2, &parms); + auto state1 = rq::Query<R, Shard>::get_query_state(&shard1, &parms); + auto state2 = rq::Query<R, Shard>::get_query_state(&shard2, &parms); - std::vector<std::vector<de::Wrapped<Rec>>> results(2); - results[0] = rq::Query<Rec, Shard>::query(&shard1, state1, &parms); - results[1] = rq::Query<Rec, Shard>::query(&shard2, state2, &parms); + std::vector<std::vector<de::Wrapped<R>>> results(2); + results[0] = rq::Query<R, Shard>::query(&shard1, state1, &parms); + results[1] = rq::Query<R, Shard>::query(&shard2, state2, &parms); - rq::Query<Rec, Shard>::delete_query_state(state1); - rq::Query<Rec, Shard>::delete_query_state(state2); + rq::Query<R, Shard>::delete_query_state(state1); + rq::Query<R, Shard>::delete_query_state(state2); ck_assert_int_eq(results[0].size() + results[1].size(), result_size); - std::vector<std::vector<Wrapped<Rec>>> proc_results; + std::vector<std::vector<Wrapped<R>>> proc_results; for (size_t j=0; j<results.size(); j++) { - proc_results.emplace_back(std::vector<Wrapped<Rec>>()); + proc_results.emplace_back(std::vector<Wrapped<R>>()); for (size_t i=0; i<results[j].size(); i++) { proc_results[j].emplace_back(results[j][i]); } } - auto result = rq::Query<Rec, Shard>::merge(proc_results, nullptr); + auto result = rq::Query<R, Shard>::merge(proc_results, nullptr); std::sort(result.begin(), result.end()); ck_assert_int_eq(result.size(), result_size); @@ -137,8 +139,8 @@ END_TEST START_TEST(t_lower_bound) { - auto buffer1 = create_sequential_mbuffer<Rec>(100, 200); - auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000); + auto buffer1 = create_sequential_mbuffer<R>(100, 200); + auto buffer2 = create_sequential_mbuffer<R>(400, 1000); auto shard1 = new Shard(buffer1->get_buffer_view()); auto shard2 = new Shard(buffer2->get_buffer_view()); @@ -148,7 +150,7 @@ START_TEST(t_lower_bound) auto merged = Shard(shards); for (size_t i=100; i<1000; i++) { - Rec r; + R r; r.key = i; r.value = i; diff --git a/tests/include/shard_standard.h b/tests/include/shard_standard.h index ddd7614..f50c1cb 100644 --- a/tests/include/shard_standard.h +++ b/tests/include/shard_standard.h @@ -8,8 +8,8 @@ * Distributed under the Modified BSD License. * * WARNING: This file must be included in the main unit test set - * after the definition of an appropriate Shard and Rec - * type. In particular, Rec needs to implement the key-value + * after the definition of an appropriate Shard and R + * type. In particular, R needs to implement the key-value * pair interface. For other types of record, you'll need to * use a different set of unit tests. */ @@ -26,11 +26,11 @@ //#include "testing.h" //#include <check.h> //using namespace de; -//typedef ISAMTree<Rec> Shard; +//typedef ISAMTree<R> Shard; START_TEST(t_mbuffer_init) { - auto buffer = new MutableBuffer<Rec>(512, 1024); + auto buffer = new MutableBuffer<R>(512, 1024); for (uint64_t i = 512; i > 0; i--) { uint32_t v = i; buffer->append({i,v, 1}); @@ -57,9 +57,9 @@ START_TEST(t_mbuffer_init) START_TEST(t_shard_init) { size_t n = 512; - auto mbuffer1 = create_test_mbuffer<Rec>(n); - auto mbuffer2 = create_test_mbuffer<Rec>(n); - auto mbuffer3 = create_test_mbuffer<Rec>(n); + auto mbuffer1 = create_test_mbuffer<R>(n); + auto mbuffer2 = create_test_mbuffer<R>(n); + auto mbuffer3 = create_test_mbuffer<R>(n); auto shard1 = new Shard(mbuffer1->get_buffer_view()); auto shard2 = new Shard(mbuffer2->get_buffer_view()); @@ -108,8 +108,8 @@ START_TEST(t_shard_init) START_TEST(t_full_cancelation) { size_t n = 100; - auto buffer = create_double_seq_mbuffer<Rec>(n, false); - auto buffer_ts = create_double_seq_mbuffer<Rec>(n, true); + auto buffer = create_double_seq_mbuffer<R>(n, false); + auto buffer_ts = create_double_seq_mbuffer<R>(n, true); Shard* shard = new Shard(buffer->get_buffer_view()); Shard* shard_ts = new Shard(buffer_ts->get_buffer_view()); @@ -139,14 +139,14 @@ START_TEST(t_point_lookup) { size_t n = 10000; - auto buffer = create_double_seq_mbuffer<Rec>(n, false); + auto buffer = create_double_seq_mbuffer<R>(n, false); auto isam = Shard(buffer->get_buffer_view()); { auto view = buffer->get_buffer_view(); for (size_t i=0; i<n; i++) { - Rec r; + R r; auto rec = view.get(i); r.key = rec->rec.key; r.value = rec->rec.value; @@ -167,11 +167,11 @@ START_TEST(t_point_lookup_miss) { size_t n = 10000; - auto buffer = create_double_seq_mbuffer<Rec>(n, false); + auto buffer = create_double_seq_mbuffer<R>(n, false); auto isam = Shard(buffer->get_buffer_view()); for (size_t i=n + 100; i<2*n; i++) { - Rec r; + R r; r.key = i; r.value = i; diff --git a/tests/include/wirs.h b/tests/include/wirs.h new file mode 100644 index 0000000..90cd22d --- /dev/null +++ b/tests/include/wirs.h @@ -0,0 +1,181 @@ +/* + * tests/include/rangequery.h + * + * Standardized unit tests for range queries against supporting + * shard types + * + * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * WARNING: This file must be included in the main unit test set + * after the definition of an appropriate Shard and R + * type. In particular, R needs to implement the key-value + * pair interface and Shard needs to support lower_bound. + * For other types of record and shard, you'll need to + * use a different set of unit tests. + */ +#pragma once + +/* + * Uncomment these lines temporarily to remove errors in this file + * temporarily for development purposes. They should be removed prior + * to building, to ensure no duplicate definitions. These includes/defines + * should be included in the source file that includes this one, above the + * include statement. + */ +//#include "shard/ISAMTree.h" +//#include "query/rangequery.h" +//#include "testing.h" +//#include <check.h> +//using namespace de; +//typedef ISAMTree<R> Shard; + + +START_TEST(t_range_query) +{ + auto buffer = create_sequential_mbuffer<R>(100, 1000); + auto shard = Shard(buffer->get_buffer_view()); + + rq::Parms<R> parms; + parms.lower_bound = 300; + parms.upper_bound = 500; + + auto state = rq::Query<R, Shard>::get_query_state(&shard, &parms); + auto result = rq::Query<R, Shard>::query(&shard, state, &parms); + rq::Query<R, Shard>::delete_query_state(state); + + ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1); + for (size_t i=0; i<result.size(); i++) { + ck_assert_int_le(result[i].rec.key, parms.upper_bound); + ck_assert_int_ge(result[i].rec.key, parms.lower_bound); + } + + delete buffer; +} +END_TEST + + +START_TEST(t_buffer_range_query) +{ + auto buffer = create_sequential_mbuffer<R>(100, 1000); + + rq::Parms<R> parms; + parms.lower_bound = 300; + parms.upper_bound = 500; + + { + auto view = buffer->get_buffer_view(); + auto state = rq::Query<R, Shard>::get_buffer_query_state(&view, &parms); + auto result = rq::Query<R, Shard>::buffer_query(state, &parms); + rq::Query<R, Shard>::delete_buffer_query_state(state); + + ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1); + for (size_t i=0; i<result.size(); i++) { + ck_assert_int_le(result[i].rec.key, parms.upper_bound); + ck_assert_int_ge(result[i].rec.key, parms.lower_bound); + } + } + + delete buffer; +} +END_TEST + + +START_TEST(t_range_query_merge) +{ + auto buffer1 = create_sequential_mbuffer<R>(100, 200); + auto buffer2 = create_sequential_mbuffer<R>(400, 1000); + + auto shard1 = Shard(buffer1->get_buffer_view()); + auto shard2 = Shard(buffer2->get_buffer_view()); + + rq::Parms<R> parms; + parms.lower_bound = 150; + parms.upper_bound = 500; + + size_t result_size = parms.upper_bound - parms.lower_bound + 1 - 200; + + auto state1 = rq::Query<R, Shard>::get_query_state(&shard1, &parms); + auto state2 = rq::Query<R, Shard>::get_query_state(&shard2, &parms); + + std::vector<std::vector<de::Wrapped<R>>> results(2); + results[0] = rq::Query<R, Shard>::query(&shard1, state1, &parms); + results[1] = rq::Query<R, Shard>::query(&shard2, state2, &parms); + + rq::Query<R, Shard>::delete_query_state(state1); + rq::Query<R, Shard>::delete_query_state(state2); + + ck_assert_int_eq(results[0].size() + results[1].size(), result_size); + + std::vector<std::vector<Wrapped<R>>> proc_results; + + for (size_t j=0; j<results.size(); j++) { + proc_results.emplace_back(std::vector<Wrapped<R>>()); + for (size_t i=0; i<results[j].size(); i++) { + proc_results[j].emplace_back(results[j][i]); + } + } + + auto result = rq::Query<R, Shard>::merge(proc_results, nullptr); + std::sort(result.begin(), result.end()); + + ck_assert_int_eq(result.size(), result_size); + auto key = parms.lower_bound; + for (size_t i=0; i<result.size(); i++) { + ck_assert_int_eq(key++, result[i].key); + if (key == 200) { + key = 400; + } + } + + delete buffer1; + delete buffer2; +} +END_TEST + + +START_TEST(t_lower_bound) +{ + auto buffer1 = create_sequential_mbuffer<R>(100, 200); + auto buffer2 = create_sequential_mbuffer<R>(400, 1000); + + auto shard1 = new Shard(buffer1->get_buffer_view()); + auto shard2 = new Shard(buffer2->get_buffer_view()); + + std::vector<Shard*> shards = {shard1, shard2}; + + auto merged = Shard(shards); + + for (size_t i=100; i<1000; i++) { + R r; + r.key = i; + r.value = i; + + auto idx = merged.get_lower_bound(i); + + assert(idx < merged.get_record_count()); + + auto res = merged.get_record_at(idx); + + if (i >=200 && i <400) { + ck_assert_int_lt(res->rec.key, i); + } else { + ck_assert_int_eq(res->rec.key, i); + } + } + + delete buffer1; + delete buffer2; + delete shard1; + delete shard2; +} +END_TEST + +static void inject_rangequery_tests(Suite *suite) { + TCase *range_query = tcase_create("Range Query Testing"); + tcase_add_test(range_query, t_range_query); + tcase_add_test(range_query, t_buffer_range_query); + tcase_add_test(range_query, t_range_query_merge); + suite_add_tcase(suite, range_query); +} diff --git a/tests/include/wss.h b/tests/include/wss.h new file mode 100644 index 0000000..f0ac74c --- /dev/null +++ b/tests/include/wss.h @@ -0,0 +1,144 @@ +/* + * tests/include/rangequery.h + * + * Standardized unit tests for range queries against supporting + * shard types + * + * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * WARNING: This file must be included in the main unit test set + * after the definition of an appropriate Shard and R + * type. In particular, R needs to implement the key-value + * pair interface and Shard needs to support lower_bound. + * For other types of record and shard, you'll need to + * use a different set of unit tests. + */ +#pragma once + +/* + * Uncomment these lines temporarily to remove errors in this file + * temporarily for development purposes. They should be removed prior + * to building, to ensure no duplicate definitions. These includes/defines + * should be included in the source file that includes this one, above the + * include statement. + */ +#include "shard/Alias.h" +#include "testing.h" +#include <check.h> +using namespace de; +typedef Alias<R> Shard; + +#include "query/wss.h" + +START_TEST(t_wss_query) +{ + auto buffer = create_weighted_mbuffer<R>(1000); + auto shard = Shard(buffer->get_buffer_view()); + + auto rng = gsl_rng_alloc(gsl_rng_mt19937); + + wss::Parms<R> parms; + parms.rng = rng; + parms.sample_size = 20; + + auto state = wss::Query<R, Shard>::get_query_state(&shard, &parms); + auto result = wss::Query<R, Shard>::query(&shard, state, &parms); + wss::Query<R, Shard>::delete_query_state(state); + + delete buffer; + gsl_rng_free(rng); +} +END_TEST + + +START_TEST(t_buffer_wss_query) +{ + auto buffer = create_weighted_mbuffer<R>(1000); + + + auto rng = gsl_rng_alloc(gsl_rng_mt19937); + + wss::Parms<R> parms; + parms.rng = rng; + + { + auto view = buffer->get_buffer_view(); + auto state = wss::Query<R, Shard>::get_buffer_query_state(&view, &parms); + auto result = wss::Query<R, Shard>::buffer_query(state, &parms); + wss::Query<R, Shard>::delete_buffer_query_state(state); + + ck_assert_int_eq(result.size(), parms.sample_size); + for (size_t i=0; i<result.size(); i++) { + + } + } + + delete buffer; +} +END_TEST + + +/* +START_TEST(t_range_query_merge) +{ + auto buffer1 = create_sequential_mbuffer<R>(100, 200); + auto buffer2 = create_sequential_mbuffer<R>(400, 1000); + + auto shard1 = Shard(buffer1->get_buffer_view()); + auto shard2 = Shard(buffer2->get_buffer_view()); + + wss::Parms<R> parms; + parms.lower_bound = 150; + parms.upper_bound = 500; + + size_t result_size = parms.upper_bound - parms.lower_bound + 1 - 200; + + auto state1 = wss::Query<R, Shard>::get_query_state(&shard1, &parms); + auto state2 = wss::Query<R, Shard>::get_query_state(&shard2, &parms); + + std::vector<std::vector<de::Wrapped<R>>> results(2); + results[0] = wss::Query<R, Shard>::query(&shard1, state1, &parms); + results[1] = wss::Query<R, Shard>::query(&shard2, state2, &parms); + + wss::Query<R, Shard>::delete_query_state(state1); + wss::Query<R, Shard>::delete_query_state(state2); + + ck_assert_int_eq(results[0].size() + results[1].size(), result_size); + + std::vector<std::vector<Wrapped<R>>> proc_results; + + for (size_t j=0; j<results.size(); j++) { + proc_results.emplace_back(std::vector<Wrapped<R>>()); + for (size_t i=0; i<results[j].size(); i++) { + proc_results[j].emplace_back(results[j][i]); + } + } + + auto result = wss::Query<R, Shard>::merge(proc_results, nullptr); + std::sort(result.begin(), result.end()); + + ck_assert_int_eq(result.size(), result_size); + auto key = parms.lower_bound; + for (size_t i=0; i<result.size(); i++) { + ck_assert_int_eq(key++, result[i].key); + if (key == 200) { + key = 400; + } + } + + delete buffer1; + delete buffer2; +} +END_TEST +*/ + + +static void inject_wss_tests(Suite *suite) { + TCase *wss_query = tcase_create("WSS Query Testing"); + tcase_add_test(wss_query, t_wss_query); + tcase_add_test(wss_query, t_buffer_wss_query); + //tcase_add_test(wss_query, t_wss_query_merge); + suite_add_tcase(suite, wss_query); +} diff --git a/tests/memisam_tests.cpp b/tests/memisam_tests.cpp index b398524..9117ce3 100644 --- a/tests/memisam_tests.cpp +++ b/tests/memisam_tests.cpp @@ -12,21 +12,19 @@ #include "shard/ISAMTree.h" #include "include/testing.h" -#include "query/rangequery.h" #include <check.h> - - using namespace de; -typedef ISAMTree<Rec> Shard; +typedef Rec R; +typedef ISAMTree<R> Shard; #include "include/shard_standard.h" #include "include/rangequery.h" Suite *unit_testing() { - Suite *unit = suite_create("ISAMTree Shard Unit Testing"); + Suite *unit = suite_create("Alias-augmented B+Tree Shard Unit Testing"); inject_rangequery_tests(unit); inject_shard_tests(unit); diff --git a/tests/pgm_tests.cpp b/tests/pgm_tests.cpp index c7750ac..ee350de 100644 --- a/tests/pgm_tests.cpp +++ b/tests/pgm_tests.cpp @@ -1,7 +1,7 @@ /* - * tests/irs_tests.cpp + * tests/isam_tests.cpp * - * Unit tests for PGM (Augmented B+Tree) shard + * Unit tests for ISAM Tree shard * * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> * Dong Xie <dongx@psu.edu> @@ -11,330 +11,23 @@ */ #include "shard/PGM.h" -#include "query/rangequery.h" -#include "testing.h" - +#include "include/testing.h" #include <check.h> using namespace de; -typedef PGM<Rec> Shard; - -START_TEST(t_mbuffer_init) -{ - auto buffer = new MutableBuffer<Rec>(1024, 1024); - for (uint64_t i = 512; i > 0; i--) { - uint32_t v = i; - buffer->append({i,v, 1}); - } - - for (uint64_t i = 1; i <= 256; ++i) { - uint32_t v = i; - buffer->append({i, v, 1}, true); - } - - for (uint64_t i = 257; i <= 512; ++i) { - uint32_t v = i + 1; - buffer->append({i, v, 1}); - } - - Shard* shard = new Shard(buffer); - ck_assert_uint_eq(shard->get_record_count(), 512); - - delete buffer; - delete shard; -} - - -START_TEST(t_irs_init) -{ - size_t n = 512; - auto mbuffer1 = create_test_mbuffer<Rec>(n); - auto mbuffer2 = create_test_mbuffer<Rec>(n); - auto mbuffer3 = create_test_mbuffer<Rec>(n); - - auto shard1 = new Shard(mbuffer1); - auto shard2 = new Shard(mbuffer2); - auto shard3 = new Shard(mbuffer3); - - Shard* shards[3] = {shard1, shard2, shard3}; - auto shard4 = new Shard(shards, 3); - - ck_assert_int_eq(shard4->get_record_count(), n * 3); - ck_assert_int_eq(shard4->get_tombstone_count(), 0); - - size_t total_cnt = 0; - size_t shard1_idx = 0; - size_t shard2_idx = 0; - size_t shard3_idx = 0; - - for (size_t i = 0; i < shard4->get_record_count(); ++i) { - auto rec1 = shard1->get_record_at(shard1_idx); - auto rec2 = shard2->get_record_at(shard2_idx); - auto rec3 = shard3->get_record_at(shard3_idx); - - auto cur_rec = shard4->get_record_at(i); - - if (shard1_idx < n && cur_rec->rec == rec1->rec) { - ++shard1_idx; - } else if (shard2_idx < n && cur_rec->rec == rec2->rec) { - ++shard2_idx; - } else if (shard3_idx < n && cur_rec->rec == rec3->rec) { - ++shard3_idx; - } else { - assert(false); - } - } - - delete mbuffer1; - delete mbuffer2; - delete mbuffer3; - - delete shard1; - delete shard2; - delete shard3; - delete shard4; -} - -START_TEST(t_point_lookup) -{ - size_t n = 10000; - - auto buffer = create_double_seq_mbuffer<Rec>(n, false); - auto shard = Shard(buffer); - - for (size_t i=0; i<n; i++) { - Rec r; - auto rec = (buffer->get_data() + i); - r.key = rec->rec.key; - r.value = rec->rec.value; - - auto result = shard.point_lookup(r); - ck_assert_ptr_nonnull(result); - ck_assert_int_eq(result->rec.key, r.key); - ck_assert_int_eq(result->rec.value, r.value); - } - - delete buffer; -} -END_TEST - - -START_TEST(t_point_lookup_miss) -{ - size_t n = 10000; - - auto buffer = create_double_seq_mbuffer<Rec>(n, false); - auto isam = Shard(buffer); - - for (size_t i=n + 100; i<2*n; i++) { - Rec r; - r.key = i; - r.value = i; - - auto result = isam.point_lookup(r); - ck_assert_ptr_null(result); - } - - delete buffer; -} - - -START_TEST(t_range_query) -{ - auto buffer = create_sequential_mbuffer<Rec>(100, 1000); - auto shard = Shard(buffer); - - rq::Parms<Rec> parms; - parms.lower_bound = 300; - parms.upper_bound = 500; - - auto state = rq::Query<Shard, Rec>::get_query_state(&shard, &parms); - auto result = rq::Query<Shard, Rec>::query(&shard, state, &parms); - rq::Query<Shard, Rec>::delete_query_state(state); - - ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1); - for (size_t i=0; i<result.size(); i++) { - ck_assert_int_le(result[i].rec.key, parms.upper_bound); - ck_assert_int_ge(result[i].rec.key, parms.lower_bound); - } - - delete buffer; -} -END_TEST - - -START_TEST(t_buffer_range_query) -{ - auto buffer = create_sequential_mbuffer<Rec>(100, 1000); - - rq::Parms<Rec> parms; - parms.lower_bound = 300; - parms.upper_bound = 500; - - auto state = rq::Query<Shard, Rec>::get_buffer_query_state(buffer, &parms); - auto result = rq::Query<Shard, Rec>::buffer_query(buffer, state, &parms); - rq::Query<Shard, Rec>::delete_buffer_query_state(state); - - ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1); - for (size_t i=0; i<result.size(); i++) { - ck_assert_int_le(result[i].rec.key, parms.upper_bound); - ck_assert_int_ge(result[i].rec.key, parms.lower_bound); - } - - delete buffer; -} -END_TEST - - -START_TEST(t_range_query_merge) -{ - auto buffer1 = create_sequential_mbuffer<Rec>(100, 200); - auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000); - - auto shard1 = Shard(buffer1); - auto shard2 = Shard(buffer2); - - rq::Parms<Rec> parms; - parms.lower_bound = 150; - parms.upper_bound = 500; - - size_t result_size = parms.upper_bound - parms.lower_bound + 1 - 200; - - auto state1 = rq::Query<Shard, Rec>::get_query_state(&shard1, &parms); - auto state2 = rq::Query<Shard, Rec>::get_query_state(&shard2, &parms); - - std::vector<std::vector<de::Wrapped<Rec>>> results(2); - results[0] = rq::Query<Shard, Rec>::query(&shard1, state1, &parms); - results[1] = rq::Query<Shard, Rec>::query(&shard2, state2, &parms); - - rq::Query<Shard, Rec>::delete_query_state(state1); - rq::Query<Shard, Rec>::delete_query_state(state2); - - ck_assert_int_eq(results[0].size() + results[1].size(), result_size); - - std::vector<std::vector<Wrapped<Rec>>> proc_results; - - for (size_t j=0; j<results.size(); j++) { - proc_results.emplace_back(std::vector<Wrapped<Rec>>()); - for (size_t i=0; i<results[j].size(); i++) { - proc_results[j].emplace_back(results[j][i]); - } - } - - auto result = rq::Query<Shard, Rec>::merge(proc_results, nullptr); - std::sort(result.begin(), result.end()); - - ck_assert_int_eq(result.size(), result_size); - auto key = parms.lower_bound; - for (size_t i=0; i<result.size(); i++) { - ck_assert_int_eq(key++, result[i].key); - if (key == 200) { - key = 400; - } - } - - delete buffer1; - delete buffer2; -} -END_TEST - -START_TEST(t_lower_bound) -{ - auto buffer1 = create_sequential_mbuffer<Rec>(100, 200); - auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000); - - de::PGM<Rec> *shards[2]; - - auto shard1 = Shard(buffer1); - auto shard2 = Shard(buffer2); - - shards[0] = &shard1; - shards[1] = &shard2; - - auto merged = Shard(shards, 2); - - for (size_t i=100; i<1000; i++) { - Rec r; - r.key = i; - r.value = i; - - auto idx = merged.get_lower_bound(i); - - assert(idx < merged.get_record_count()); - - auto res = merged.get_record_at(idx); - - if (i >=200 && i <400) { - ck_assert_int_lt(res->rec.key, i); - } else { - ck_assert_int_eq(res->rec.key, i); - } - } - - delete buffer1; - delete buffer2; -} -END_TEST - - -START_TEST(t_full_cancelation) -{ - size_t n = 100; - auto buffer = create_double_seq_mbuffer<Rec>(n, false); - auto buffer_ts = create_double_seq_mbuffer<Rec>(n, true); - - Shard* shard = new Shard(buffer); - Shard* shard_ts = new Shard(buffer_ts); - - ck_assert_int_eq(shard->get_record_count(), n); - ck_assert_int_eq(shard->get_tombstone_count(), 0); - ck_assert_int_eq(shard_ts->get_record_count(), n); - ck_assert_int_eq(shard_ts->get_tombstone_count(), n); - - Shard* shards[] = {shard, shard_ts}; - - Shard* merged = new Shard(shards, 2); - - ck_assert_int_eq(merged->get_tombstone_count(), 0); - ck_assert_int_eq(merged->get_record_count(), 0); - - delete buffer; - delete buffer_ts; - delete shard; - delete shard_ts; - delete merged; -} -END_TEST +typedef Rec R; +typedef PGM<R> Shard; +#include "include/shard_standard.h" +#include "include/rangequery.h" Suite *unit_testing() { Suite *unit = suite_create("PGM Shard Unit Testing"); - TCase *create = tcase_create("de::PGM constructor Testing"); - tcase_add_test(create, t_mbuffer_init); - tcase_add_test(create, t_irs_init); - tcase_set_timeout(create, 100); - suite_add_tcase(unit, create); - - - TCase *tombstone = tcase_create("de:PGM::tombstone cancellation Testing"); - tcase_add_test(tombstone, t_full_cancelation); - suite_add_tcase(unit, tombstone); - - - TCase *lookup = tcase_create("de:PGM:point_lookup Testing"); - tcase_add_test(lookup, t_point_lookup); - tcase_add_test(lookup, t_point_lookup_miss); - tcase_add_test(lookup, t_lower_bound); - suite_add_tcase(unit, lookup); - - TCase *range_query = tcase_create("de:PGM::range_query Testing"); - tcase_add_test(range_query, t_range_query); - tcase_add_test(range_query, t_buffer_range_query); - tcase_add_test(range_query, t_range_query_merge); - suite_add_tcase(unit, range_query); + inject_rangequery_tests(unit); + inject_shard_tests(unit); return unit; } diff --git a/tests/rangecount_tests.cpp b/tests/rangecount_tests.cpp index fe3a587..3be8234 100644 --- a/tests/rangecount_tests.cpp +++ b/tests/rangecount_tests.cpp @@ -19,6 +19,7 @@ using namespace de; +typedef Rec R; typedef ISAMTree<Rec> Shard; #include "include/rangecount.h" diff --git a/tests/rangequery_tests.cpp b/tests/rangequery_tests.cpp index 49c73d3..bf5fb5e 100644 --- a/tests/rangequery_tests.cpp +++ b/tests/rangequery_tests.cpp @@ -19,7 +19,8 @@ using namespace de; -typedef ISAMTree<Rec> Shard; +typedef Rec R; +typedef ISAMTree<R> Shard; #include "include/rangequery.h" diff --git a/tests/triespline_tests.cpp b/tests/triespline_tests.cpp index 9df278f..e884360 100644 --- a/tests/triespline_tests.cpp +++ b/tests/triespline_tests.cpp @@ -12,19 +12,19 @@ #include "shard/TrieSpline.h" #include "include/testing.h" -#include "query/rangequery.h" #include <check.h> using namespace de; -typedef TrieSpline<Rec> Shard; +typedef Rec R; +typedef TrieSpline<R> Shard; #include "include/shard_standard.h" #include "include/rangequery.h" Suite *unit_testing() { - Suite *unit = suite_create("ISAMTree Shard Unit Testing"); + Suite *unit = suite_create("Triespline Shard Unit Testing"); inject_rangequery_tests(unit); inject_shard_tests(unit); |