diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-30 15:31:34 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-30 15:31:34 -0500 |
| commit | 51a85013236f4b2bd596caf179d90e67c848963c (patch) | |
| tree | b4f7643d5c18986b038bd444b88b0bdf316cd01f | |
| parent | f24fdf2fd310a5f868e15cd9682ca37d740c77af (diff) | |
| download | dynamic-extension-51a85013236f4b2bd596caf179d90e67c848963c.tar.gz | |
TrieSpline + tests
| -rw-r--r-- | CMakeLists.txt | 13 | ||||
| -rw-r--r-- | include/shard/TrieSpline.h | 151 | ||||
| -rw-r--r-- | tests/include/rangequery.h | 12 | ||||
| -rw-r--r-- | tests/triespline_tests.cpp | 242 |
4 files changed, 117 insertions, 301 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index f3d6c3a..1dbacca 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -98,6 +98,11 @@ if (tests) target_link_options(memisam_tests PUBLIC -mcx16) target_include_directories(memisam_tests PRIVATE include external/psudb-common/cpp/include) + add_executable(triespline_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/triespline_tests.cpp) + target_link_libraries(triespline_tests PUBLIC gsl check subunit pthread atomic) + target_link_options(triespline_tests PUBLIC -mcx16) + target_include_directories(triespline_tests PRIVATE include external/psudb-common/cpp/include external/PLEX/include) + #add_executable(alias_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/alias_tests.cpp) #target_link_libraries(alias_tests PUBLIC gsl check subunit pthread) #target_include_directories(alias_tests PRIVATE include external/psudb-common/cpp/include) @@ -135,7 +140,15 @@ if (bench) target_include_directories(watermark_testing PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include bench/include external/psudb-common/cpp/include) target_link_options(watermark_testing PUBLIC -mcx16) + add_executable(static_dynamic_comp ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/static_dynamic_comp.cpp) + target_link_libraries(static_dynamic_comp PUBLIC gsl pthread gomp atomic) + target_include_directories(static_dynamic_comp PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include bench/include external/psudb-common/cpp/include) + target_link_options(static_dynamic_comp PUBLIC -mcx16) + add_executable(insert_tail_latency ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/insert_tail_latency.cpp) + target_link_libraries(insert_tail_latency PUBLIC gsl pthread gomp atomic) + target_include_directories(insert_tail_latency PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include bench/include external/psudb-common/cpp/include) + target_link_options(insert_tail_latency PUBLIC -mcx16) endif() if (old_bench) diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 56ec357..8142a67 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -12,10 +12,6 @@ #include <vector> -#include <cassert> -#include <queue> -#include <memory> -#include <concepts> #include "framework/ShardRequirements.h" #include "ts/builder.h" @@ -23,62 +19,64 @@ #include "util/Cursor.h" #include "psu-ds/BloomFilter.h" #include "util/bf_config.h" +#include "psu-util/timer.h" using psudb::CACHELINE_SIZE; using psudb::BloomFilter; using psudb::PriorityQueue; using psudb::queue_record; -using psudb::Alias; namespace de { -template <RecordInterface R, size_t E=1024> +template <KVPInterface R, size_t E=1024> class TrieSpline { private: typedef decltype(R::key) K; typedef decltype(R::value) V; public: - TrieSpline(MutableBuffer<R>* buffer) - : m_reccnt(0), m_tombstone_cnt(0) { - - m_alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE); - assert(m_alloc_size % CACHELINE_SIZE == 0); - m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size); - - m_bf = new BloomFilter<R>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS); - - size_t offset = 0; - m_reccnt = 0; - auto base = buffer->get_data(); - auto stop = base + buffer->get_record_count(); - + TrieSpline(BufferView<R> buffer) + : m_data(nullptr) + , m_reccnt(0) + , m_tombstone_cnt(0) + , m_alloc_size(0) + , m_max_key(0) + , m_min_key(0) + , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + { + TIMER_INIT(); + + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + buffer.get_record_count() * + sizeof(Wrapped<R>), + (byte**) &m_data); + + TIMER_START(); + auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>)); + buffer.copy_to_buffer((byte *) temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); std::sort(base, stop, std::less<Wrapped<R>>()); K min_key = base->rec.key; - K max_key = (stop - 1)->rec.key; + K max_key = (stop-1)->rec.key; + TIMER_STOP(); - auto bldr = ts::Builder<K>(min_key, max_key, E); + auto sort_time = TIMER_RESULT(); + TIMER_START(); + auto bldr = ts::Builder<K>(min_key, max_key, E); while (base < stop) { - if (!(base->is_tombstone()) && (base + 1) < stop) { - if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - continue; - } + if (!base->is_tombstone() && (base + 1 < stop) + && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; } else if (base->is_deleted()) { base += 1; continue; } - if (m_reccnt == 0) { - m_max_key = m_min_key = base->rec.key; - } else if (base->rec.key > m_max_key) { - m_max_key = base->rec.key; - } else if (base->rec.key < m_min_key) { - m_min_key = base->rec.key; - } - // FIXME: this shouldn't be necessary, but the tagged record // bypass doesn't seem to be working on this code-path, so this // ensures that tagged records from the buffer are able to be @@ -86,37 +84,67 @@ public: base->header &= 3; m_data[m_reccnt++] = *base; bldr.AddKey(base->rec.key); - if (m_bf && base->is_tombstone()) { - m_tombstone_cnt++; + ++m_tombstone_cnt; m_bf->insert(base->rec); } - + + /* + * determine the "true" min/max keys based on the scan. This is + * to avoid situations where the min/max in the input array + * are deleted and don't survive into the structure itself. + */ + if (m_reccnt == 0) { + m_max_key = m_min_key = base->rec.key; + } else if (base->rec.key > m_max_key) { + m_max_key = base->rec.key; + } else if (base->rec.key < m_min_key) { + m_min_key = base->rec.key; + } + base++; } + TIMER_STOP(); + auto copy_time = TIMER_RESULT(); + + TIMER_START(); if (m_reccnt > 0) { m_ts = bldr.Finalize(); } + TIMER_STOP(); + auto level_time = TIMER_RESULT(); + + free(temp_buffer); } - TrieSpline(TrieSpline** shards, size_t len) - : m_reccnt(0), m_tombstone_cnt(0) { + TrieSpline(std::vector<TrieSpline*> &shards) + : m_data(nullptr) + , m_reccnt(0) + , m_tombstone_cnt(0) + , m_alloc_size(0) + , m_max_key(0) + , m_min_key(0) + , m_bf(nullptr) + { + std::vector<Cursor<Wrapped<R>>> cursors; - cursors.reserve(len); + cursors.reserve(shards.size()); - PriorityQueue<Wrapped<R>> pq(len); + PriorityQueue<Wrapped<R>> pq(shards.size()); size_t attemp_reccnt = 0; size_t tombstone_count = 0; - // initialize m_max_key and m_min_key using the values from the - // first shard. These will later be updated when building - // the initial priority queue to their true values. + /* + * Initialize m_max_key and m_min_key using the values from the + * first shard. These will later be updated when building + * the initial priority queue to their true values. + */ m_max_key = shards[0]->m_max_key; m_min_key = shards[0]->m_min_key; - for (size_t i = 0; i < len; ++i) { + for (size_t i = 0; i < shards.size(); ++i) { if (shards[i]) { auto base = shards[i]->get_data(); cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); @@ -137,12 +165,11 @@ public: } m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); - auto bldr = ts::Builder<K>(m_min_key, m_max_key, E); - - m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE); - assert(m_alloc_size % CACHELINE_SIZE == 0); - m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size); + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + attemp_reccnt * sizeof(Wrapped<R>), + (byte **) &m_data); + auto bldr = ts::Builder<K>(m_min_key, m_max_key, E); while (pq.size()) { auto now = pq.peek(); auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; @@ -152,33 +179,32 @@ public: pq.pop(); pq.pop(); auto& cursor1 = cursors[now.version]; auto& cursor2 = cursors[next.version]; - if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version); + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto& cursor = cursors[now.version]; if (!cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; bldr.AddKey(cursor.ptr->rec.key); - if (m_bf && cursor.ptr->is_tombstone()) { + if (cursor.ptr->is_tombstone()) { ++m_tombstone_cnt; - if (m_bf) m_bf->insert(cursor.ptr->rec); + m_bf->insert(cursor.ptr->rec); } } pq.pop(); - if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version); + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); } } if (m_reccnt > 0) { m_ts = bldr.Finalize(); } - } + } ~TrieSpline() { - if (m_data) free(m_data); - if (m_bf) delete m_bf; - + free(m_data); + delete m_bf; } Wrapped<R> *point_lookup(const R &rec, bool filter=false) { @@ -253,14 +279,17 @@ public: max = mid; } } + } + if (idx == m_reccnt) { + return m_reccnt; } if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) { return idx-1; } - return (m_data[idx].rec.key <= key) ? idx : m_reccnt; + return idx; } private: diff --git a/tests/include/rangequery.h b/tests/include/rangequery.h index e45de57..1ac0891 100644 --- a/tests/include/rangequery.h +++ b/tests/include/rangequery.h @@ -24,12 +24,12 @@ * should be included in the source file that includes this one, above the * include statement. */ -#include "shard/ISAMTree.h" -#include "query/rangequery.h" -#include "testing.h" -#include <check.h> -using namespace de; -typedef ISAMTree<Rec> Shard; +//#include "shard/ISAMTree.h" +//#include "query/rangequery.h" +//#include "testing.h" +//#include <check.h> +//using namespace de; +//typedef ISAMTree<Rec> Shard; START_TEST(t_range_query) diff --git a/tests/triespline_tests.cpp b/tests/triespline_tests.cpp index 14506be..9df278f 100644 --- a/tests/triespline_tests.cpp +++ b/tests/triespline_tests.cpp @@ -1,7 +1,7 @@ /* - * tests/triespline_tests.cpp + * tests/isam_tests.cpp * - * Unit tests for TrieSpline (Augmented B+Tree) shard + * Unit tests for ISAM Tree shard * * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> * Dong Xie <dongx@psu.edu> @@ -10,250 +10,24 @@ * */ -#include <functional> - #include "shard/TrieSpline.h" +#include "include/testing.h" #include "query/rangequery.h" -#include "testing.h" - #include <check.h> using namespace de; typedef TrieSpline<Rec> Shard; -START_TEST(t_mbuffer_init) -{ - auto buffer = new MutableBuffer<Rec>(1024, 1024); - for (uint64_t i = 512; i > 0; i--) { - uint32_t v = i; - buffer->append({i,v, 1}); - } - - for (uint64_t i = 1; i <= 256; ++i) { - uint32_t v = i; - buffer->append({i, v, 1}, true); - } - - for (uint64_t i = 257; i <= 512; ++i) { - uint32_t v = i + 1; - buffer->append({i, v, 1}); - } - - Shard* shard = new Shard(buffer); - ck_assert_uint_eq(shard->get_record_count(), 512); - - delete buffer; - delete shard; -} - - -START_TEST(t_init) -{ - size_t n = 512; - auto mbuffer1 = create_test_mbuffer<Rec>(n); - auto mbuffer2 = create_test_mbuffer<Rec>(n); - auto mbuffer3 = create_test_mbuffer<Rec>(n); - - auto shard1 = new Shard(mbuffer1); - auto shard2 = new Shard(mbuffer2); - auto shard3 = new Shard(mbuffer3); - - Shard* shards[3] = {shard1, shard2, shard3}; - auto shard4 = new Shard(shards, 3); - - ck_assert_int_eq(shard4->get_record_count(), n * 3); - ck_assert_int_eq(shard4->get_tombstone_count(), 0); - - size_t total_cnt = 0; - size_t shard1_idx = 0; - size_t shard2_idx = 0; - size_t shard3_idx = 0; - - for (size_t i = 0; i < shard4->get_record_count(); ++i) { - auto rec1 = shard1->get_record_at(shard1_idx); - auto rec2 = shard2->get_record_at(shard2_idx); - auto rec3 = shard3->get_record_at(shard3_idx); - - auto cur_rec = shard4->get_record_at(i); - - if (shard1_idx < n && cur_rec->rec == rec1->rec) { - ++shard1_idx; - } else if (shard2_idx < n && cur_rec->rec == rec2->rec) { - ++shard2_idx; - } else if (shard3_idx < n && cur_rec->rec == rec3->rec) { - ++shard3_idx; - } else { - assert(false); - } - } - - delete mbuffer1; - delete mbuffer2; - delete mbuffer3; - - delete shard1; - delete shard2; - delete shard3; - delete shard4; -} - -START_TEST(t_point_lookup) -{ - size_t n = 10000; - - auto buffer = create_double_seq_mbuffer<Rec>(n, false); - auto shard = Shard(buffer); - - for (size_t i=0; i<n; i++) { - Rec r; - auto rec = (buffer->get_data() + i); - r.key = rec->rec.key; - r.value = rec->rec.value; - - auto result = shard.point_lookup(r); - ck_assert_ptr_nonnull(result); - ck_assert_int_eq(result->rec.key, r.key); - ck_assert_int_eq(result->rec.value, r.value); - } - - delete buffer; -} -END_TEST - - -START_TEST(t_point_lookup_miss) -{ - size_t n = 10000; - - auto buffer = create_double_seq_mbuffer<Rec>(n, false); - auto isam = Shard(buffer); - - for (size_t i=n + 100; i<2*n; i++) { - Rec r; - r.key = i; - r.value = i; - - auto result = isam.point_lookup(r); - ck_assert_ptr_null(result); - } - - delete buffer; -} - - -START_TEST(t_full_cancelation) -{ - size_t n = 100; - auto buffer = create_double_seq_mbuffer<Rec>(n, false); - auto buffer_ts = create_double_seq_mbuffer<Rec>(n, true); - - Shard* shard = new Shard(buffer); - Shard* shard_ts = new Shard(buffer_ts); - - ck_assert_int_eq(shard->get_record_count(), n); - ck_assert_int_eq(shard->get_tombstone_count(), 0); - ck_assert_int_eq(shard_ts->get_record_count(), n); - ck_assert_int_eq(shard_ts->get_tombstone_count(), n); - - Shard* shards[] = {shard, shard_ts}; - - Shard* merged = new Shard(shards, 2); - - ck_assert_int_eq(merged->get_tombstone_count(), 0); - ck_assert_int_eq(merged->get_record_count(), 0); - - delete buffer; - delete buffer_ts; - delete shard; - delete shard_ts; - delete merged; -} -END_TEST - - -START_TEST(t_range_query) -{ - auto buffer = create_sequential_mbuffer<Rec>(100, 1000); - auto shard = Shard(buffer); - - rq::Parms<Rec> parms; - parms.lower_bound = 300; - parms.upper_bound = 500; - - auto state = rq::Query<Shard, Rec>::get_query_state(&shard, &parms); - auto result = rq::Query<Shard, Rec>::query(&shard, state, &parms); - rq::Query<Shard, Rec>::delete_query_state(state); - - ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1); - for (size_t i=0; i<result.size(); i++) { - ck_assert_int_le(result[i].rec.key, parms.upper_bound); - ck_assert_int_ge(result[i].rec.key, parms.lower_bound); - } - - delete buffer; -} -END_TEST - - -START_TEST(t_buffer_range_query) -{ - auto buffer = create_sequential_mbuffer<Rec>(100, 1000); - - rq::Parms<Rec> parms; - parms.lower_bound = 300; - parms.upper_bound = 500; - - auto state = rq::Query<Shard, Rec>::get_buffer_query_state(buffer, &parms); - auto result = rq::Query<Shard, Rec>::buffer_query(buffer, state, &parms); - rq::Query<Shard, Rec>::delete_buffer_query_state(state); - - ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1); - for (size_t i=0; i<result.size(); i++) { - ck_assert_int_le(result[i].rec.key, parms.upper_bound); - ck_assert_int_ge(result[i].rec.key, parms.lower_bound); - } - - delete buffer; -} -END_TEST - - -START_TEST(t_range_query_merge) -{ - -} -END_TEST - +#include "include/shard_standard.h" +#include "include/rangequery.h" Suite *unit_testing() { - Suite *unit = suite_create("TrieSpline Shard Unit Testing"); - - TCase *create = tcase_create("de::TrieSpline constructor Testing"); - tcase_add_test(create, t_mbuffer_init); - tcase_add_test(create, t_init); - tcase_set_timeout(create, 100); - suite_add_tcase(unit, create); - - - TCase *tombstone = tcase_create("de:TrieSpline::tombstone cancellation Testing"); - tcase_add_test(tombstone, t_full_cancelation); - suite_add_tcase(unit, tombstone); - - - TCase *lookup = tcase_create("de:TrieSpline:point_lookup Testing"); - tcase_add_test(lookup, t_point_lookup); - tcase_add_test(lookup, t_point_lookup_miss); - suite_add_tcase(unit, lookup); - - - TCase *range_query = tcase_create("de:TrieSpline::range_query Testing"); - tcase_add_test(range_query, t_range_query); - tcase_add_test(range_query, t_buffer_range_query); - tcase_add_test(range_query, t_range_query_merge); - suite_add_tcase(unit, range_query); + Suite *unit = suite_create("ISAMTree Shard Unit Testing"); + inject_rangequery_tests(unit); + inject_shard_tests(unit); return unit; } |