From eb19677340be6f0befe9da2199e5832af51eea0d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 10 Jan 2024 18:01:30 -0500 Subject: MutableBuffer: multithreaded insert test + bugfixes --- include/framework/structure/MutableBuffer.h | 13 ++++++++---- tests/mutable_buffer_tests.cpp | 32 ++++++++++++++++++----------- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index d57ad6e..a065154 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -49,7 +49,6 @@ public: delete m_tombstone_filter; } - template int append(const R &rec, bool tombstone=false) { int32_t pos = 0; if ((pos = try_advance_tail()) == -1) return 0; @@ -91,7 +90,7 @@ public: } bool is_at_low_watermark() { - return (m_tail % m_cap) > m_lwm; + return get_record_count() >= m_lwm; } size_t get_tombstone_count() { @@ -139,10 +138,14 @@ public: * head and head_refcnt into old_head and old_head_refcnt, then * assign new_head to old_head. */ - void advance_head(size_t new_head) { + bool advance_head(size_t new_head) { assert(new_head > m_head.load()); assert(new_head <= m_tail.load()); - assert(m_old_head_refcnt == 0); + + /* refuse to advance head while there is an old with one references */ + if (m_old_head_refcnt > 0) { + return false; + } /* * the order here is very important. We first store zero to the @@ -159,6 +162,8 @@ public: m_head_refcnt.store(0); m_head.store(new_head); + + return true; } void set_low_watermark(size_t lwm) { diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp index e714d3e..0520097 100644 --- a/tests/mutable_buffer_tests.cpp +++ b/tests/mutable_buffer_tests.cpp @@ -12,15 +12,12 @@ #include #include -#include #include "testing.h" #include "framework/structure/MutableBuffer.h" #include -#define DE_MT_TEST 0 - using namespace de; START_TEST(t_create) @@ -60,9 +57,9 @@ START_TEST(t_insert) /* insert records up to the low watermark */ size_t cnt = 0; for (size_t i=0; i<50; i++) { + ck_assert_int_eq(buffer->is_at_low_watermark(), false); ck_assert_int_eq(buffer->append(rec), 1); ck_assert_int_eq(buffer->check_tombstone(rec), 0); - ck_assert_int_eq(buffer->is_at_low_watermark(), false); rec.key++; rec.value++; @@ -73,6 +70,8 @@ START_TEST(t_insert) ck_assert_int_eq(buffer->get_tail(), cnt); } + ck_assert_int_eq(buffer->is_at_low_watermark(), true); + /* insert records up to the high watermark */ for (size_t i=0; i<50; i++) { ck_assert_int_eq(buffer->is_full(), 0); @@ -118,7 +117,7 @@ START_TEST(t_advance_head) cnt++; if (buffer->is_at_low_watermark() && new_head == 0) { - new_head = buffer->get_tail() - 1; + new_head = buffer->get_tail(); } } @@ -132,30 +131,38 @@ START_TEST(t_advance_head) view.copy_to_buffer((psudb::byte *) view_records); /* advance the head */ - buffer->advance_head(new_head); + ck_assert_int_eq(buffer->advance_head(new_head), 1); ck_assert_int_eq(buffer->get_record_count(), 25); ck_assert_int_eq(buffer->get_buffer_view().get_record_count(), 25); ck_assert_int_eq(view.get_record_count(), cnt); ck_assert_int_eq(buffer->get_available_capacity(), 200 - cnt); + + /* refuse to advance head again while there remain references to the old one */ + ck_assert_int_eq(buffer->advance_head(buffer->get_tail() -1), 0); } /* once the buffer view falls out of scope, the capacity of the buffer should increase */ ck_assert_int_eq(buffer->get_available_capacity(), 175); + /* now the head should be able to be advanced */ + ck_assert_int_eq(buffer->advance_head(buffer->get_tail()), 1); + + /* and the buffer should be empty */ + ck_assert_int_eq(buffer->get_record_count(), 0); + delete buffer; delete[] view_records; } END_TEST -void insert_records(std::vector> *values, size_t start, size_t stop, MutableBuffer *buffer) +void insert_records(std::vector *values, size_t start, size_t stop, MutableBuffer *buffer) { for (size_t i=start; iappend({(*values)[i].first, (*values)[i].second}); + buffer->append((*values)[i]); } } -/* START_TEST(t_multithreaded_insert) { size_t cnt = 10000; @@ -166,7 +173,7 @@ START_TEST(t_multithreaded_insert) records[i] = Rec {(uint64_t) rand(), (uint32_t) rand()}; } - // perform a multithreaded insertion + /* perform a multithreaded insertion */ size_t thread_cnt = 8; size_t per_thread = cnt / thread_cnt; std::vector workers(thread_cnt); @@ -186,9 +193,10 @@ START_TEST(t_multithreaded_insert) ck_assert_int_eq(buffer->is_full(), 1); ck_assert_int_eq(buffer->get_record_count(), cnt); + + delete buffer; } END_TEST -*/ START_TEST(t_truncate) @@ -243,7 +251,7 @@ Suite *unit_testing() TCase *append = tcase_create("de::MutableBuffer::append Testing"); tcase_add_test(append, t_insert); tcase_add_test(append, t_advance_head); - //tcase_add_test(append, t_multithreaded_insert); + tcase_add_test(append, t_multithreaded_insert); suite_add_tcase(unit, append); -- cgit v1.2.3