summaryrefslogtreecommitdiffstats
path: root/tests/mutable_buffer_tests.cpp
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <dbr4@psu.edu>2024-02-09 14:06:59 -0500
committerGitHub <noreply@github.com>2024-02-09 14:06:59 -0500
commitbc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30 (patch)
tree66333c55feb0ea8875a50e6dc07c8535d241bf1c /tests/mutable_buffer_tests.cpp
parent076e104b8672924c3d80cd1da2fdb5ebee1766ac (diff)
parent46885246313358a3b606eca139b20280e96db10e (diff)
downloaddynamic-extension-bc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30.tar.gz
Merge pull request #1 from dbrumbaugh/new-buffer
Initial Concurrency Implementation
Diffstat (limited to 'tests/mutable_buffer_tests.cpp')
-rw-r--r--tests/mutable_buffer_tests.cpp352
1 files changed, 237 insertions, 115 deletions
diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp
index 201fddb..31c16dc 100644
--- a/tests/mutable_buffer_tests.cpp
+++ b/tests/mutable_buffer_tests.cpp
@@ -1,39 +1,47 @@
/*
* tests/mutable_buffer_tests.cpp
*
- * Unit tests for MutableBuffer
+ * Unit tests for MutableBuffer and BufferView
*
* Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
*
- * All rights reserved. Published under the Modified BSD License.
+ * Distributed under the Modified BSD License.
*
*/
-#include <string>
+
#include <thread>
#include <vector>
-#include <algorithm>
-#include "testing.h"
-#include "framework/MutableBuffer.h"
+#include "include/testing.h"
+#include "framework/structure/MutableBuffer.h"
#include <check.h>
-#define DE_MT_TEST 0
-
using namespace de;
START_TEST(t_create)
{
- auto buffer = new MutableBuffer<Rec>(100, 50);
+ size_t lwm = 50, hwm = 100;
+ size_t cap = 2 * hwm;
+
+ auto buffer = new MutableBuffer<Rec>(lwm, hwm);
ck_assert_ptr_nonnull(buffer);
- ck_assert_int_eq(buffer->get_capacity(), 100);
- ck_assert_int_eq(buffer->get_record_count(), 0);
+ ck_assert_int_eq(buffer->get_capacity(), cap);
+ ck_assert_int_eq(buffer->get_low_watermark(), lwm);
+ ck_assert_int_eq(buffer->get_high_watermark(), hwm);
+
ck_assert_int_eq(buffer->is_full(), false);
- ck_assert_ptr_nonnull(buffer->get_data());
+ ck_assert_int_eq(buffer->is_at_low_watermark(), false);
+ ck_assert_int_eq(buffer->get_record_count(), 0);
ck_assert_int_eq(buffer->get_tombstone_count(), 0);
- ck_assert_int_eq(buffer->get_tombstone_capacity(), 50);
+
+ {
+ auto view = buffer->get_buffer_view();
+ ck_assert_int_eq(view.get_tombstone_count(), 0);
+ ck_assert_int_eq(view.get_record_count(), 0);
+ }
delete buffer;
}
@@ -42,76 +50,149 @@ END_TEST
START_TEST(t_insert)
{
- auto buffer = new MutableBuffer<WRec>(100, 50);
+ auto buffer = new MutableBuffer<Rec>(50, 100);
+
+ Rec rec = {0, 5, 1};
- uint64_t key = 0;
- uint32_t val = 5;
+ /* insert records up to the low watermark */
+ size_t cnt = 0;
+ for (size_t i=0; i<50; i++) {
+ ck_assert_int_eq(buffer->is_at_low_watermark(), false);
+ ck_assert_int_eq(buffer->append(rec), 1);
+ ck_assert_int_eq(buffer->check_tombstone(rec), 0);
- WRec rec = {0, 5, 1};
+ rec.key++;
+ rec.value++;
+ cnt++;
- for (size_t i=0; i<99; i++) {
+ ck_assert_int_eq(buffer->get_record_count(), cnt);
+ ck_assert_int_eq(buffer->get_buffer_view().get_record_count(), cnt);
+ ck_assert_int_eq(buffer->get_tail(), cnt);
+ }
+
+ ck_assert_int_eq(buffer->is_at_low_watermark(), true);
+
+ /* insert records up to the high watermark */
+ for (size_t i=0; i<50; i++) {
+ ck_assert_int_eq(buffer->is_full(), 0);
ck_assert_int_eq(buffer->append(rec), 1);
ck_assert_int_eq(buffer->check_tombstone(rec), 0);
rec.key++;
rec.value++;
+ cnt++;
+
+ ck_assert_int_eq(buffer->get_record_count(), cnt);
+ ck_assert_int_eq(buffer->get_buffer_view().get_record_count(), cnt);
- ck_assert_int_eq(buffer->get_record_count(), i+1);
ck_assert_int_eq(buffer->get_tombstone_count(), 0);
- ck_assert_int_eq(buffer->is_full(), 0);
+ ck_assert_int_eq(buffer->is_at_low_watermark(), true);
+ ck_assert_int_eq(buffer->get_tail(), cnt);
}
- ck_assert_int_eq(buffer->append(rec), 1);
-
+ /* further inserts should fail */
rec.key++;
rec.value++;
-
ck_assert_int_eq(buffer->is_full(), 1);
ck_assert_int_eq(buffer->append(rec), 0);
delete buffer;
-
}
END_TEST
-START_TEST(t_insert_tombstones)
+START_TEST(t_advance_head)
{
- auto buffer = new MutableBuffer<Rec>(100, 50);
+ auto buffer = new MutableBuffer<Rec>(50, 100);
- size_t ts_cnt = 0;
+ /* insert 75 records and get tail when LWM is exceeded */
+ size_t new_head = 0;
+ Rec rec = {1, 1};
+ size_t cnt = 0;
+ for (size_t i=0; i<75; i++) {
+ ck_assert_int_eq(buffer->append(rec), 1);
- Rec rec = {0, 5};
+ rec.key++;
+ rec.value++;
+ cnt++;
- for (size_t i=0; i<99; i++) {
- bool ts = false;
- if (i % 2 == 0) {
- ts_cnt++;
- ts=true;
+ if (buffer->is_at_low_watermark() && new_head == 0) {
+ new_head = buffer->get_tail();
}
+ }
- ck_assert_int_eq(buffer->append(rec, ts), 1);
- ck_assert_int_eq(buffer->check_tombstone(rec), ts);
+ ck_assert_int_eq(buffer->get_available_capacity(), 200 - cnt);
- rec.key++;
- rec.value++;
+ Wrapped<Rec> *view_records = new Wrapped<Rec>[buffer->get_record_count()];
+ {
+ /* get a view of the pre-advanced state */
+ auto view = buffer->get_buffer_view();
+ ck_assert_int_eq(view.get_record_count(), cnt);
+ view.copy_to_buffer((psudb::byte *) view_records);
- ck_assert_int_eq(buffer->get_record_count(), i+1);
- ck_assert_int_eq(buffer->get_tombstone_count(), ts_cnt);
- ck_assert_int_eq(buffer->is_full(), 0);
+ /* advance the head */
+ ck_assert_int_eq(buffer->advance_head(new_head), 1);
+ ck_assert_int_eq(buffer->get_record_count(), 25);
+ ck_assert_int_eq(buffer->get_buffer_view().get_record_count(), 25);
+ ck_assert_int_eq(view.get_record_count(), cnt);
+ ck_assert_int_eq(buffer->get_available_capacity(), 200 - cnt);
+
+ /* refuse to advance head again while there remain references to the old one */
+ ck_assert_int_eq(buffer->advance_head(buffer->get_tail() -1), 0);
}
- // inserting one more tombstone should not be possible
- ck_assert_int_eq(buffer->append(rec, true), 0);
+ /* once the buffer view falls out of scope, the capacity of the buffer should increase */
+ ck_assert_int_eq(buffer->get_available_capacity(), 175);
+ /* now the head should be able to be advanced */
+ ck_assert_int_eq(buffer->advance_head(buffer->get_tail()), 1);
- ck_assert_int_eq(buffer->append(rec), 1);
+ /* and the buffer should be empty */
+ ck_assert_int_eq(buffer->get_record_count(), 0);
- rec.key++;
- rec.value++;
+ delete buffer;
+ delete[] view_records;
+}
+END_TEST
+
+void insert_records(std::vector<Rec> *values, size_t start, size_t stop, MutableBuffer<Rec> *buffer)
+{
+ for (size_t i=start; i<stop; i++) {
+ buffer->append((*values)[i]);
+ }
+
+}
+
+START_TEST(t_multithreaded_insert)
+{
+ size_t cnt = 10000;
+ auto buffer = new MutableBuffer<Rec>(cnt/2, cnt);
+
+ std::vector<Rec> records(cnt);
+ for (size_t i=0; i<cnt; i++) {
+ records[i] = Rec {(uint64_t) rand(), (uint32_t) rand()};
+ }
+
+ /* perform a multithreaded insertion */
+ size_t thread_cnt = 8;
+ size_t per_thread = cnt / thread_cnt;
+ std::vector<std::thread> workers(thread_cnt);
+ size_t start = 0;
+ size_t stop = start + per_thread;
+ for (size_t i=0; i<thread_cnt; i++) {
+ workers[i] = std::thread(insert_records, &records, start, stop, buffer);
+ start = stop;
+ stop = std::min(start + per_thread, cnt);
+ }
+
+ for (size_t i=0; i<thread_cnt; i++) {
+ if (workers[i].joinable()) {
+ workers[i].join();
+ }
+ }
ck_assert_int_eq(buffer->is_full(), 1);
- ck_assert_int_eq(buffer->append(rec), 0);
+ ck_assert_int_eq(buffer->get_record_count(), cnt);
delete buffer;
}
@@ -120,7 +201,7 @@ END_TEST
START_TEST(t_truncate)
{
- auto buffer = new MutableBuffer<Rec>(100, 100);
+ auto buffer = new MutableBuffer<Rec>(50, 100);
size_t ts_cnt = 0;
Rec rec = {0, 5};
@@ -157,42 +238,76 @@ START_TEST(t_truncate)
}
END_TEST
-
-START_TEST(t_get_data)
+START_TEST(t_bview_get)
{
- size_t cnt = 100;
+ auto buffer = new MutableBuffer<Rec>(50, 100);
- auto buffer = new MutableBuffer<Rec>(cnt, cnt/2);
+ /* insert 75 records and get tail when LWM is exceeded */
+ size_t new_head = 0;
+ Rec rec = {1, 1};
+ size_t cnt = 0;
+ for (size_t i=0; i<75; i++) {
+ ck_assert_int_eq(buffer->append(rec), 1);
+ rec.key++;
+ rec.value++;
+ cnt++;
- std::vector<uint64_t> keys(cnt);
- for (size_t i=0; i<cnt-2; i++) {
- keys[i] = rand();
+ if (buffer->is_at_low_watermark() && new_head == 0) {
+ new_head = buffer->get_tail();
+ }
}
- // duplicate final two records for tombstone testing
- // purposes
- keys[cnt-2] = keys[cnt-3];
- keys[cnt-1] = keys[cnt-2];
+ ck_assert_int_eq(buffer->get_available_capacity(), 200 - cnt);
+
+ {
+ /* get a view of the pre-advanced state */
+ auto view = buffer->get_buffer_view();
+ auto reccnt = view.get_record_count();
- uint32_t val = 12345;
- for (size_t i=0; i<cnt-2; i++) {
- buffer->append(Rec {keys[i], val});
+ /* scan the records in the view */
+ for (size_t i=0; i<reccnt; i++) {
+ ck_assert_int_eq(view.get(i)->rec.key, i+1);
+ }
+
+ /* advance the head */
+ buffer->advance_head(new_head);
+
+ /* scan the records in the view again -- should be unchanged */
+ for (size_t i=0; i<reccnt; i++) {
+ ck_assert_int_eq(view.get(i)->rec.key, i+1);
+ }
}
- Rec r1 = {keys[cnt-2], val};
- buffer->append(r1, true);
+ {
+ /* get a new view (should have fewer records) */
+ auto view = buffer->get_buffer_view();
+ auto reccnt = view.get_record_count();
- Rec r2 = {keys[cnt-1], val};
- buffer->append(r2, true);
+ /* verify the scan again */
+ for (size_t i=0; i<reccnt; i++) {
+ ck_assert_int_eq(view.get(i)->rec.key, i + 51);
+ }
+ }
+
+ /* insert more records (to trigger a wrap-around) */
+ for (size_t i=0; i<75; i++) {
+ ck_assert_int_eq(buffer->append(rec), 1);
+ rec.key++;
+ rec.value++;
+ cnt++;
+ }
- auto *sorted_records = buffer->get_data();
- std::sort(keys.begin(), keys.end());
- std::sort(sorted_records, sorted_records + buffer->get_record_count(), std::less<Wrapped<Rec>>());
+ {
+ /* get a new view (should have fewer records) */
+ auto view = buffer->get_buffer_view();
+ auto reccnt = view.get_record_count();
- for (size_t i=0; i<cnt; i++) {
- ck_assert_int_eq(sorted_records[i].rec.key, keys[i]);
+ /* verify the scan again */
+ for (size_t i=0; i<reccnt; i++) {
+ ck_assert_int_eq(view.get(i)->rec.key, i + 51);
+ }
}
delete buffer;
@@ -200,56 +315,65 @@ START_TEST(t_get_data)
END_TEST
-void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t start, size_t stop, MutableBuffer<Rec> *buffer)
+START_TEST(t_bview_delete)
{
- for (size_t i=start; i<stop; i++) {
- buffer->append({(*values)[i].first, (*values)[i].second});
- }
-}
+ auto buffer = new MutableBuffer<Rec>(50, 100);
-#if DE_MT_TEST
-START_TEST(t_multithreaded_insert)
-{
- size_t cnt = 10000;
- auto buffer = new MutableBuffer<Rec>(cnt, true, cnt/2);
-
- std::vector<Rec> records(cnt);
- for (size_t i=0; i<cnt; i++) {
- records[i] = Rec {(uint64_t) rand(), (uint32_t) rand()};
- }
+ /* insert 75 records and get tail when LWM is exceeded */
+ size_t new_head = 0;
+ Rec rec = {1, 1};
+ size_t cnt = 0;
+ for (size_t i=0; i<75; i++) {
+ ck_assert_int_eq(buffer->append(rec), 1);
- // perform a t_multithreaded insertion
- size_t thread_cnt = 8;
- size_t per_thread = cnt / thread_cnt;
- std::vector<std::thread> workers(thread_cnt);
- size_t start = 0;
- size_t stop = start + per_thread;
- for (size_t i=0; i<thread_cnt; i++) {
- workers[i] = std::thread(insert_records, &records, start, stop, buffer);
- start = stop;
- stop = std::min(start + per_thread, cnt);
- }
+ rec.key++;
+ rec.value++;
+ cnt++;
- for (size_t i=0; i<thread_cnt; i++) {
- if (workers[i].joinable()) {
- workers[i].join();
+ if (buffer->is_at_low_watermark() && new_head == 0) {
+ new_head = buffer->get_tail();
}
}
- ck_assert_int_eq(buffer->is_full(), 1);
- ck_assert_int_eq(buffer->get_record_count(), cnt);
+ buffer->advance_head(new_head);
- std::sort(records.begin(), records.end());
- auto *sorted_records = buffer->sorted_output();
- for (size_t i=0; i<cnt; i++) {
- ck_assert_int_eq(sorted_records[i].key, records[i].key);
+ for (size_t i=0; i<75; i++) {
+ ck_assert_int_eq(buffer->append(rec), 1);
+
+ rec.key++;
+ rec.value++;
+ cnt++;
+ }
+
+ Rec dr1 = {67, 67};
+ Rec dr2 = {89, 89};
+ Rec dr3 = {103, 103};
+
+ Rec fdr1 = {5, 5};
+ Rec fdr2 = {300, 300};
+ {
+ /* get a new view (should have fewer records) */
+ auto view = buffer->get_buffer_view();
+ ck_assert_int_eq(view.delete_record(dr1), 1);
+ ck_assert_int_eq(view.delete_record(dr2), 1);
+ ck_assert_int_eq(view.delete_record(dr3), 1);
+ ck_assert_int_eq(view.delete_record(fdr1), 0);
+ ck_assert_int_eq(view.delete_record(fdr2), 0);
+
+ for (size_t i=0; i<view.get_record_count(); i++) {
+ if (view.get(i)->rec == dr1 || view.get(i)->rec == dr2
+ || view.get(i)->rec == dr3) {
+ ck_assert_int_eq(view.get(i)->is_deleted(), 1);
+ } else {
+ ck_assert_int_eq(view.get(i)->is_deleted(), 0);
+ }
+ }
}
delete buffer;
}
END_TEST
-#endif
Suite *unit_testing()
@@ -263,13 +387,16 @@ Suite *unit_testing()
TCase *append = tcase_create("de::MutableBuffer::append Testing");
tcase_add_test(append, t_insert);
- tcase_add_test(append, t_insert_tombstones);
- #if DE_MT_TEST
- tcase_add_test(append, t_multithreaded_insert);
- #endif
+ tcase_add_test(append, t_advance_head);
+ tcase_add_test(append, t_multithreaded_insert);
suite_add_tcase(unit, append);
+ TCase *view = tcase_create("de::BufferView Testing");
+ tcase_add_test(view, t_bview_get);
+ tcase_add_test(view, t_bview_delete);
+
+ suite_add_tcase(unit, view);
TCase *truncate = tcase_create("de::MutableBuffer::truncate Testing");
tcase_add_test(truncate, t_truncate);
@@ -277,11 +404,6 @@ Suite *unit_testing()
suite_add_tcase(unit, truncate);
- TCase *sorted_out = tcase_create("de::MutableBuffer::get_data");
- tcase_add_test(sorted_out, t_get_data);
-
- suite_add_tcase(unit, sorted_out);
-
return unit;
}