diff options
| author | Douglas B. Rumbaugh <dbr4@psu.edu> | 2024-02-09 14:06:59 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-02-09 14:06:59 -0500 |
| commit | bc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30 (patch) | |
| tree | 66333c55feb0ea8875a50e6dc07c8535d241bf1c /tests/mutable_buffer_tests.cpp | |
| parent | 076e104b8672924c3d80cd1da2fdb5ebee1766ac (diff) | |
| parent | 46885246313358a3b606eca139b20280e96db10e (diff) | |
| download | dynamic-extension-bc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30.tar.gz | |
Merge pull request #1 from dbrumbaugh/new-buffer
Initial Concurrency Implementation
Diffstat (limited to 'tests/mutable_buffer_tests.cpp')
| -rw-r--r-- | tests/mutable_buffer_tests.cpp | 352 |
1 files changed, 237 insertions, 115 deletions
diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp index 201fddb..31c16dc 100644 --- a/tests/mutable_buffer_tests.cpp +++ b/tests/mutable_buffer_tests.cpp @@ -1,39 +1,47 @@ /* * tests/mutable_buffer_tests.cpp * - * Unit tests for MutableBuffer + * Unit tests for MutableBuffer and BufferView * * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> * Dong Xie <dongx@psu.edu> * - * All rights reserved. Published under the Modified BSD License. + * Distributed under the Modified BSD License. * */ -#include <string> + #include <thread> #include <vector> -#include <algorithm> -#include "testing.h" -#include "framework/MutableBuffer.h" +#include "include/testing.h" +#include "framework/structure/MutableBuffer.h" #include <check.h> -#define DE_MT_TEST 0 - using namespace de; START_TEST(t_create) { - auto buffer = new MutableBuffer<Rec>(100, 50); + size_t lwm = 50, hwm = 100; + size_t cap = 2 * hwm; + + auto buffer = new MutableBuffer<Rec>(lwm, hwm); ck_assert_ptr_nonnull(buffer); - ck_assert_int_eq(buffer->get_capacity(), 100); - ck_assert_int_eq(buffer->get_record_count(), 0); + ck_assert_int_eq(buffer->get_capacity(), cap); + ck_assert_int_eq(buffer->get_low_watermark(), lwm); + ck_assert_int_eq(buffer->get_high_watermark(), hwm); + ck_assert_int_eq(buffer->is_full(), false); - ck_assert_ptr_nonnull(buffer->get_data()); + ck_assert_int_eq(buffer->is_at_low_watermark(), false); + ck_assert_int_eq(buffer->get_record_count(), 0); ck_assert_int_eq(buffer->get_tombstone_count(), 0); - ck_assert_int_eq(buffer->get_tombstone_capacity(), 50); + + { + auto view = buffer->get_buffer_view(); + ck_assert_int_eq(view.get_tombstone_count(), 0); + ck_assert_int_eq(view.get_record_count(), 0); + } delete buffer; } @@ -42,76 +50,149 @@ END_TEST START_TEST(t_insert) { - auto buffer = new MutableBuffer<WRec>(100, 50); + auto buffer = new MutableBuffer<Rec>(50, 100); + + Rec rec = {0, 5, 1}; - uint64_t key = 0; - uint32_t val = 5; + /* insert records up to the low watermark */ + size_t cnt = 0; + for (size_t i=0; i<50; i++) { + ck_assert_int_eq(buffer->is_at_low_watermark(), false); + ck_assert_int_eq(buffer->append(rec), 1); + ck_assert_int_eq(buffer->check_tombstone(rec), 0); - WRec rec = {0, 5, 1}; + rec.key++; + rec.value++; + cnt++; - for (size_t i=0; i<99; i++) { + ck_assert_int_eq(buffer->get_record_count(), cnt); + ck_assert_int_eq(buffer->get_buffer_view().get_record_count(), cnt); + ck_assert_int_eq(buffer->get_tail(), cnt); + } + + ck_assert_int_eq(buffer->is_at_low_watermark(), true); + + /* insert records up to the high watermark */ + for (size_t i=0; i<50; i++) { + ck_assert_int_eq(buffer->is_full(), 0); ck_assert_int_eq(buffer->append(rec), 1); ck_assert_int_eq(buffer->check_tombstone(rec), 0); rec.key++; rec.value++; + cnt++; + + ck_assert_int_eq(buffer->get_record_count(), cnt); + ck_assert_int_eq(buffer->get_buffer_view().get_record_count(), cnt); - ck_assert_int_eq(buffer->get_record_count(), i+1); ck_assert_int_eq(buffer->get_tombstone_count(), 0); - ck_assert_int_eq(buffer->is_full(), 0); + ck_assert_int_eq(buffer->is_at_low_watermark(), true); + ck_assert_int_eq(buffer->get_tail(), cnt); } - ck_assert_int_eq(buffer->append(rec), 1); - + /* further inserts should fail */ rec.key++; rec.value++; - ck_assert_int_eq(buffer->is_full(), 1); ck_assert_int_eq(buffer->append(rec), 0); delete buffer; - } END_TEST -START_TEST(t_insert_tombstones) +START_TEST(t_advance_head) { - auto buffer = new MutableBuffer<Rec>(100, 50); + auto buffer = new MutableBuffer<Rec>(50, 100); - size_t ts_cnt = 0; + /* insert 75 records and get tail when LWM is exceeded */ + size_t new_head = 0; + Rec rec = {1, 1}; + size_t cnt = 0; + for (size_t i=0; i<75; i++) { + ck_assert_int_eq(buffer->append(rec), 1); - Rec rec = {0, 5}; + rec.key++; + rec.value++; + cnt++; - for (size_t i=0; i<99; i++) { - bool ts = false; - if (i % 2 == 0) { - ts_cnt++; - ts=true; + if (buffer->is_at_low_watermark() && new_head == 0) { + new_head = buffer->get_tail(); } + } - ck_assert_int_eq(buffer->append(rec, ts), 1); - ck_assert_int_eq(buffer->check_tombstone(rec), ts); + ck_assert_int_eq(buffer->get_available_capacity(), 200 - cnt); - rec.key++; - rec.value++; + Wrapped<Rec> *view_records = new Wrapped<Rec>[buffer->get_record_count()]; + { + /* get a view of the pre-advanced state */ + auto view = buffer->get_buffer_view(); + ck_assert_int_eq(view.get_record_count(), cnt); + view.copy_to_buffer((psudb::byte *) view_records); - ck_assert_int_eq(buffer->get_record_count(), i+1); - ck_assert_int_eq(buffer->get_tombstone_count(), ts_cnt); - ck_assert_int_eq(buffer->is_full(), 0); + /* advance the head */ + ck_assert_int_eq(buffer->advance_head(new_head), 1); + ck_assert_int_eq(buffer->get_record_count(), 25); + ck_assert_int_eq(buffer->get_buffer_view().get_record_count(), 25); + ck_assert_int_eq(view.get_record_count(), cnt); + ck_assert_int_eq(buffer->get_available_capacity(), 200 - cnt); + + /* refuse to advance head again while there remain references to the old one */ + ck_assert_int_eq(buffer->advance_head(buffer->get_tail() -1), 0); } - // inserting one more tombstone should not be possible - ck_assert_int_eq(buffer->append(rec, true), 0); + /* once the buffer view falls out of scope, the capacity of the buffer should increase */ + ck_assert_int_eq(buffer->get_available_capacity(), 175); + /* now the head should be able to be advanced */ + ck_assert_int_eq(buffer->advance_head(buffer->get_tail()), 1); - ck_assert_int_eq(buffer->append(rec), 1); + /* and the buffer should be empty */ + ck_assert_int_eq(buffer->get_record_count(), 0); - rec.key++; - rec.value++; + delete buffer; + delete[] view_records; +} +END_TEST + +void insert_records(std::vector<Rec> *values, size_t start, size_t stop, MutableBuffer<Rec> *buffer) +{ + for (size_t i=start; i<stop; i++) { + buffer->append((*values)[i]); + } + +} + +START_TEST(t_multithreaded_insert) +{ + size_t cnt = 10000; + auto buffer = new MutableBuffer<Rec>(cnt/2, cnt); + + std::vector<Rec> records(cnt); + for (size_t i=0; i<cnt; i++) { + records[i] = Rec {(uint64_t) rand(), (uint32_t) rand()}; + } + + /* perform a multithreaded insertion */ + size_t thread_cnt = 8; + size_t per_thread = cnt / thread_cnt; + std::vector<std::thread> workers(thread_cnt); + size_t start = 0; + size_t stop = start + per_thread; + for (size_t i=0; i<thread_cnt; i++) { + workers[i] = std::thread(insert_records, &records, start, stop, buffer); + start = stop; + stop = std::min(start + per_thread, cnt); + } + + for (size_t i=0; i<thread_cnt; i++) { + if (workers[i].joinable()) { + workers[i].join(); + } + } ck_assert_int_eq(buffer->is_full(), 1); - ck_assert_int_eq(buffer->append(rec), 0); + ck_assert_int_eq(buffer->get_record_count(), cnt); delete buffer; } @@ -120,7 +201,7 @@ END_TEST START_TEST(t_truncate) { - auto buffer = new MutableBuffer<Rec>(100, 100); + auto buffer = new MutableBuffer<Rec>(50, 100); size_t ts_cnt = 0; Rec rec = {0, 5}; @@ -157,42 +238,76 @@ START_TEST(t_truncate) } END_TEST - -START_TEST(t_get_data) +START_TEST(t_bview_get) { - size_t cnt = 100; + auto buffer = new MutableBuffer<Rec>(50, 100); - auto buffer = new MutableBuffer<Rec>(cnt, cnt/2); + /* insert 75 records and get tail when LWM is exceeded */ + size_t new_head = 0; + Rec rec = {1, 1}; + size_t cnt = 0; + for (size_t i=0; i<75; i++) { + ck_assert_int_eq(buffer->append(rec), 1); + rec.key++; + rec.value++; + cnt++; - std::vector<uint64_t> keys(cnt); - for (size_t i=0; i<cnt-2; i++) { - keys[i] = rand(); + if (buffer->is_at_low_watermark() && new_head == 0) { + new_head = buffer->get_tail(); + } } - // duplicate final two records for tombstone testing - // purposes - keys[cnt-2] = keys[cnt-3]; - keys[cnt-1] = keys[cnt-2]; + ck_assert_int_eq(buffer->get_available_capacity(), 200 - cnt); + + { + /* get a view of the pre-advanced state */ + auto view = buffer->get_buffer_view(); + auto reccnt = view.get_record_count(); - uint32_t val = 12345; - for (size_t i=0; i<cnt-2; i++) { - buffer->append(Rec {keys[i], val}); + /* scan the records in the view */ + for (size_t i=0; i<reccnt; i++) { + ck_assert_int_eq(view.get(i)->rec.key, i+1); + } + + /* advance the head */ + buffer->advance_head(new_head); + + /* scan the records in the view again -- should be unchanged */ + for (size_t i=0; i<reccnt; i++) { + ck_assert_int_eq(view.get(i)->rec.key, i+1); + } } - Rec r1 = {keys[cnt-2], val}; - buffer->append(r1, true); + { + /* get a new view (should have fewer records) */ + auto view = buffer->get_buffer_view(); + auto reccnt = view.get_record_count(); - Rec r2 = {keys[cnt-1], val}; - buffer->append(r2, true); + /* verify the scan again */ + for (size_t i=0; i<reccnt; i++) { + ck_assert_int_eq(view.get(i)->rec.key, i + 51); + } + } + + /* insert more records (to trigger a wrap-around) */ + for (size_t i=0; i<75; i++) { + ck_assert_int_eq(buffer->append(rec), 1); + rec.key++; + rec.value++; + cnt++; + } - auto *sorted_records = buffer->get_data(); - std::sort(keys.begin(), keys.end()); - std::sort(sorted_records, sorted_records + buffer->get_record_count(), std::less<Wrapped<Rec>>()); + { + /* get a new view (should have fewer records) */ + auto view = buffer->get_buffer_view(); + auto reccnt = view.get_record_count(); - for (size_t i=0; i<cnt; i++) { - ck_assert_int_eq(sorted_records[i].rec.key, keys[i]); + /* verify the scan again */ + for (size_t i=0; i<reccnt; i++) { + ck_assert_int_eq(view.get(i)->rec.key, i + 51); + } } delete buffer; @@ -200,56 +315,65 @@ START_TEST(t_get_data) END_TEST -void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t start, size_t stop, MutableBuffer<Rec> *buffer) +START_TEST(t_bview_delete) { - for (size_t i=start; i<stop; i++) { - buffer->append({(*values)[i].first, (*values)[i].second}); - } -} + auto buffer = new MutableBuffer<Rec>(50, 100); -#if DE_MT_TEST -START_TEST(t_multithreaded_insert) -{ - size_t cnt = 10000; - auto buffer = new MutableBuffer<Rec>(cnt, true, cnt/2); - - std::vector<Rec> records(cnt); - for (size_t i=0; i<cnt; i++) { - records[i] = Rec {(uint64_t) rand(), (uint32_t) rand()}; - } + /* insert 75 records and get tail when LWM is exceeded */ + size_t new_head = 0; + Rec rec = {1, 1}; + size_t cnt = 0; + for (size_t i=0; i<75; i++) { + ck_assert_int_eq(buffer->append(rec), 1); - // perform a t_multithreaded insertion - size_t thread_cnt = 8; - size_t per_thread = cnt / thread_cnt; - std::vector<std::thread> workers(thread_cnt); - size_t start = 0; - size_t stop = start + per_thread; - for (size_t i=0; i<thread_cnt; i++) { - workers[i] = std::thread(insert_records, &records, start, stop, buffer); - start = stop; - stop = std::min(start + per_thread, cnt); - } + rec.key++; + rec.value++; + cnt++; - for (size_t i=0; i<thread_cnt; i++) { - if (workers[i].joinable()) { - workers[i].join(); + if (buffer->is_at_low_watermark() && new_head == 0) { + new_head = buffer->get_tail(); } } - ck_assert_int_eq(buffer->is_full(), 1); - ck_assert_int_eq(buffer->get_record_count(), cnt); + buffer->advance_head(new_head); - std::sort(records.begin(), records.end()); - auto *sorted_records = buffer->sorted_output(); - for (size_t i=0; i<cnt; i++) { - ck_assert_int_eq(sorted_records[i].key, records[i].key); + for (size_t i=0; i<75; i++) { + ck_assert_int_eq(buffer->append(rec), 1); + + rec.key++; + rec.value++; + cnt++; + } + + Rec dr1 = {67, 67}; + Rec dr2 = {89, 89}; + Rec dr3 = {103, 103}; + + Rec fdr1 = {5, 5}; + Rec fdr2 = {300, 300}; + { + /* get a new view (should have fewer records) */ + auto view = buffer->get_buffer_view(); + ck_assert_int_eq(view.delete_record(dr1), 1); + ck_assert_int_eq(view.delete_record(dr2), 1); + ck_assert_int_eq(view.delete_record(dr3), 1); + ck_assert_int_eq(view.delete_record(fdr1), 0); + ck_assert_int_eq(view.delete_record(fdr2), 0); + + for (size_t i=0; i<view.get_record_count(); i++) { + if (view.get(i)->rec == dr1 || view.get(i)->rec == dr2 + || view.get(i)->rec == dr3) { + ck_assert_int_eq(view.get(i)->is_deleted(), 1); + } else { + ck_assert_int_eq(view.get(i)->is_deleted(), 0); + } + } } delete buffer; } END_TEST -#endif Suite *unit_testing() @@ -263,13 +387,16 @@ Suite *unit_testing() TCase *append = tcase_create("de::MutableBuffer::append Testing"); tcase_add_test(append, t_insert); - tcase_add_test(append, t_insert_tombstones); - #if DE_MT_TEST - tcase_add_test(append, t_multithreaded_insert); - #endif + tcase_add_test(append, t_advance_head); + tcase_add_test(append, t_multithreaded_insert); suite_add_tcase(unit, append); + TCase *view = tcase_create("de::BufferView Testing"); + tcase_add_test(view, t_bview_get); + tcase_add_test(view, t_bview_delete); + + suite_add_tcase(unit, view); TCase *truncate = tcase_create("de::MutableBuffer::truncate Testing"); tcase_add_test(truncate, t_truncate); @@ -277,11 +404,6 @@ Suite *unit_testing() suite_add_tcase(unit, truncate); - TCase *sorted_out = tcase_create("de::MutableBuffer::get_data"); - tcase_add_test(sorted_out, t_get_data); - - suite_add_tcase(unit, sorted_out); - return unit; } |