From 53879a0d69f5e578710b7125e9b41e516c2371d4 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 10 Jan 2024 17:39:28 -0500 Subject: MutableBuffer+View: Implementation with unit tests --- include/framework/structure/BufferView.h | 32 ++-- include/framework/structure/MutableBuffer.h | 133 +++++++++++---- tests/mutable_buffer_tests.cpp | 254 +++++++++++++--------------- 3 files changed, 229 insertions(+), 190 deletions(-) diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 8a5f50f..7e8af45 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -9,32 +9,34 @@ #pragma once #include -#include -#include #include -#include -#include -#include +#include #include "psu-util/alignment.h" -#include "util/bf_config.h" #include "psu-ds/BloomFilter.h" -#include "psu-ds/Alias.h" -#include "psu-util/timer.h" #include "framework/interface/Record.h" -#include "framework/interface/Query.h" namespace de { +typedef std::function ReleaseFunction; + template class BufferView { public: BufferView() = default; - BufferView(const Wrapped *buffer, size_t head, size_t tail, psudb::BloomFilter *filter) - : m_buffer(buffer), m_head(head), m_tail(tail), m_tombstone_filter(filter) {} + BufferView(const Wrapped *buffer, size_t head, size_t tail, psudb::BloomFilter *filter, + void *parent_buffer, ReleaseFunction release) + : m_buffer(buffer) + , m_release(release) + , m_parent_buffer(parent_buffer) + , m_head(head) + , m_tail(tail) + , m_tombstone_filter(filter) {} - ~BufferView() = default; + ~BufferView() { + m_release(m_parent_buffer, m_head); + } bool check_tombstone(const R& rec) { if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; @@ -62,12 +64,14 @@ public: return m_buffer + to_idx(i); } - void copy_to_buffer(byte *buffer) { - memcpy(buffer, m_buffer, get_record_count() * sizeof(Wrapped)); + void copy_to_buffer(psudb::byte *buffer) { + memcpy(buffer, (std::byte*) (m_buffer + m_head), get_record_count() * sizeof(Wrapped)); } private: const Wrapped* m_buffer; + void *m_parent_buffer; + ReleaseFunction m_release; size_t m_head; size_t m_tail; psudb::BloomFilter *m_tombstone_filter; diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 7bec219..d57ad6e 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -11,17 +11,11 @@ #include #include -#include #include -#include -#include -#include #include "psu-util/alignment.h" #include "util/bf_config.h" #include "psu-ds/BloomFilter.h" -#include "psu-ds/Alias.h" -#include "psu-util/timer.h" #include "framework/interface/Record.h" #include "framework/structure/BufferView.h" @@ -34,7 +28,8 @@ class MutableBuffer { friend class BufferView; public: MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0) - : m_lwm(low_watermark), m_hwm(high_watermark), m_cap(capacity), m_head(0), m_tail(0) { + : m_lwm(low_watermark), m_hwm(high_watermark), m_cap(capacity), m_head(0), m_tail(0), + m_old_head(0), m_head_refcnt(0), m_old_head_refcnt(0) { /* * default capacity is twice the high water mark, to account for the worst-case * memory requirements. @@ -50,10 +45,8 @@ public: } ~MutableBuffer() { - assert(m_refcnt.load() == 0); - - if (m_data) free(m_data); - if (m_tombstone_filter) delete m_tombstone_filter; + free(m_data); + delete m_tombstone_filter; } template @@ -94,7 +87,11 @@ public: } bool is_full() { - return (m_tail % m_cap) >= m_hwm; + return get_record_count() >= m_hwm; + } + + bool is_at_low_watermark() { + return (m_tail % m_cap) > m_lwm; } size_t get_tombstone_count() { @@ -125,66 +122,132 @@ public: } size_t get_memory_usage() { - return m_cap * sizeof(R); + return m_cap * sizeof(Wrapped); } size_t get_aux_memory_usage() { return m_tombstone_filter->get_memory_usage(); } - size_t get_tombstone_capacity() { - // FIXME: tombstone capacity needs figured out again - return m_cap; + BufferView get_buffer_view() { + m_head_refcnt.fetch_add(1); + return BufferView(m_data, m_head, m_tail.load(), m_tombstone_filter, (void*) this, release_head_reference); } /* - * Concurrency-related operations + * Advance the buffer following a reconstruction. Move current + * head and head_refcnt into old_head and old_head_refcnt, then + * assign new_head to old_head. */ - bool take_reference() { - m_refcnt.fetch_add(1); - return true; + void advance_head(size_t new_head) { + assert(new_head > m_head.load()); + assert(new_head <= m_tail.load()); + assert(m_old_head_refcnt == 0); + + /* + * the order here is very important. We first store zero to the + * old_refcnt (should be zero anyway). Then we move the current + * head to old head. At this point, any new buffer views should + * increment the old head refcnt, so no new references to the + * current head will be taken. Then we add the current head + * refcnt to this. This is to ensure that no references get + * dropped. Only after this do we change to the new head + */ + m_old_head_refcnt.store(0); + m_old_head.store(m_head.load()); + m_old_head_refcnt.fetch_add(m_head_refcnt); + + m_head_refcnt.store(0); + m_head.store(new_head); } - bool release_reference() { - assert(m_refcnt > 0); - m_refcnt.fetch_add(-1); - return true; + void set_low_watermark(size_t lwm) { + assert(lwm < m_hwm); + m_lwm = lwm; } - size_t get_reference_count() { - return m_refcnt.load(); + size_t get_low_watermark() { + return m_lwm; + } + + void set_high_watermark(size_t hwm) { + assert(hwm > m_lwm); + assert(hwm < m_cap); + m_hwm = hwm; + } + + size_t get_high_watermark() { + return m_hwm; + } + + size_t get_tail() { + return m_tail.load(); + } + + /* + * Note: this returns the available physical storage capacity, + * *not* now many more records can be inserted before the + * HWM is reached. + */ + size_t get_available_capacity() { + return m_cap - (m_tail.load() - m_old_head.load()); } private: int64_t try_advance_tail() { - int64_t new_tail = m_tail.fetch_add(1) % m_cap; + size_t old_value = m_tail.load(); + + /* if full, fail to advance the tail */ + if (old_value >= m_hwm) { + return -1; + } - if (new_tail < m_hwm) { - return new_tail; + while (!m_tail.compare_exchange_strong(old_value, old_value+1)) { + /* if full, stop trying and fail to advance the tail */ + if (m_tail.load() >= m_hwm) { + return -1; + } } - m_tail.fetch_add(-1); - return -1; + return old_value; } size_t to_idx(size_t i) { return (m_head + i) % m_cap; } - size_t m_cap; + static void release_head_reference(void *buff, size_t head) { + MutableBuffer *buffer = (MutableBuffer *) buff; + + if (head == buffer->m_head.load()) { + buffer->m_head_refcnt.fetch_sub(1); + } else if (head == buffer->m_old_head.load()) { + buffer->m_old_head_refcnt.fetch_sub(1); + /* + * if the old head refcnt drops to 0, free + * the records by setting old_head = head + */ + if (buffer->m_old_head_refcnt.load() == 0) { + buffer->m_old_head.store(buffer->m_head); + } + } + } size_t m_lwm; size_t m_hwm; + size_t m_cap; alignas(64) std::atomic m_tail; + alignas(64) std::atomic m_head; + alignas(64) std::atomic m_head_refcnt; + + alignas(64) std::atomic m_old_head; + alignas(64) std::atomic m_old_head_refcnt; Wrapped* m_data; - psudb::BloomFilter* m_tombstone_filter; - alignas(64) std::atomic m_tombstonecnt; - alignas(64) std::atomic m_refcnt; }; } diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp index 8480f55..e714d3e 100644 --- a/tests/mutable_buffer_tests.cpp +++ b/tests/mutable_buffer_tests.cpp @@ -1,7 +1,7 @@ /* * tests/mutable_buffer_tests.cpp * - * Unit tests for MutableBuffer + * Unit tests for MutableBuffer and BufferView * * Copyright (C) 2023 Douglas Rumbaugh * Dong Xie @@ -9,7 +9,7 @@ * Distributed under the Modified BSD License. * */ -#include + #include #include #include @@ -25,15 +25,26 @@ using namespace de; START_TEST(t_create) { - auto buffer = new MutableBuffer(100, 50); + size_t lwm = 50, hwm = 100; + size_t cap = 2 * hwm; + + auto buffer = new MutableBuffer(lwm, hwm); ck_assert_ptr_nonnull(buffer); - ck_assert_int_eq(buffer->get_capacity(), 100); - ck_assert_int_eq(buffer->get_record_count(), 0); + ck_assert_int_eq(buffer->get_capacity(), cap); + ck_assert_int_eq(buffer->get_low_watermark(), lwm); + ck_assert_int_eq(buffer->get_high_watermark(), hwm); + ck_assert_int_eq(buffer->is_full(), false); - ck_assert_ptr_nonnull(buffer->get_data()); + ck_assert_int_eq(buffer->is_at_low_watermark(), false); + ck_assert_int_eq(buffer->get_record_count(), 0); ck_assert_int_eq(buffer->get_tombstone_count(), 0); - ck_assert_int_eq(buffer->get_tombstone_capacity(), 50); + + { + auto view = buffer->get_buffer_view(); + ck_assert_int_eq(view.get_tombstone_count(), 0); + ck_assert_int_eq(view.get_record_count(), 0); + } delete buffer; } @@ -42,74 +53,47 @@ END_TEST START_TEST(t_insert) { - auto buffer = new MutableBuffer(100, 50); + auto buffer = new MutableBuffer(50, 100); - uint64_t key = 0; - uint32_t val = 5; + Rec rec = {0, 5, 1}; - WRec rec = {0, 5, 1}; - - for (size_t i=0; i<99; i++) { + /* insert records up to the low watermark */ + size_t cnt = 0; + for (size_t i=0; i<50; i++) { ck_assert_int_eq(buffer->append(rec), 1); ck_assert_int_eq(buffer->check_tombstone(rec), 0); + ck_assert_int_eq(buffer->is_at_low_watermark(), false); rec.key++; rec.value++; + cnt++; - ck_assert_int_eq(buffer->get_record_count(), i+1); - ck_assert_int_eq(buffer->get_tombstone_count(), 0); - ck_assert_int_eq(buffer->is_full(), 0); + ck_assert_int_eq(buffer->get_record_count(), cnt); + ck_assert_int_eq(buffer->get_buffer_view().get_record_count(), cnt); + ck_assert_int_eq(buffer->get_tail(), cnt); } - ck_assert_int_eq(buffer->append(rec), 1); - - rec.key++; - rec.value++; - - ck_assert_int_eq(buffer->is_full(), 1); - ck_assert_int_eq(buffer->append(rec), 0); - - delete buffer; - -} -END_TEST - - -START_TEST(t_insert_tombstones) -{ - auto buffer = new MutableBuffer(100, 50); - - size_t ts_cnt = 0; - - Rec rec = {0, 5}; - - for (size_t i=0; i<99; i++) { - bool ts = false; - if (i % 2 == 0) { - ts_cnt++; - ts=true; - } - - ck_assert_int_eq(buffer->append(rec, ts), 1); - ck_assert_int_eq(buffer->check_tombstone(rec), ts); + /* insert records up to the high watermark */ + for (size_t i=0; i<50; i++) { + ck_assert_int_eq(buffer->is_full(), 0); + ck_assert_int_eq(buffer->append(rec), 1); + ck_assert_int_eq(buffer->check_tombstone(rec), 0); rec.key++; rec.value++; + cnt++; - ck_assert_int_eq(buffer->get_record_count(), i+1); - ck_assert_int_eq(buffer->get_tombstone_count(), ts_cnt); - ck_assert_int_eq(buffer->is_full(), 0); - } - - // inserting one more tombstone should not be possible - ck_assert_int_eq(buffer->append(rec, true), 0); + ck_assert_int_eq(buffer->get_record_count(), cnt); + ck_assert_int_eq(buffer->get_buffer_view().get_record_count(), cnt); + ck_assert_int_eq(buffer->get_tombstone_count(), 0); + ck_assert_int_eq(buffer->is_at_low_watermark(), true); + ck_assert_int_eq(buffer->get_tail(), cnt); + } - ck_assert_int_eq(buffer->append(rec), 1); - + /* further inserts should fail */ rec.key++; rec.value++; - ck_assert_int_eq(buffer->is_full(), 1); ck_assert_int_eq(buffer->append(rec), 0); @@ -118,88 +102,51 @@ START_TEST(t_insert_tombstones) END_TEST -START_TEST(t_truncate) +START_TEST(t_advance_head) { - auto buffer = new MutableBuffer(100, 100); + auto buffer = new MutableBuffer(50, 100); - size_t ts_cnt = 0; - Rec rec = {0, 5}; - - for (size_t i=0; i<100; i++) { - bool ts = false; - if (i % 2 == 0) { - ts_cnt++; - ts=true; - } - - ck_assert_int_eq(buffer->append(rec, ts), 1); - ck_assert_int_eq(buffer->check_tombstone(rec), ts); + /* insert 75 records and get tail when LWM is exceeded */ + size_t new_head = 0; + Rec rec = {1, 1}; + size_t cnt = 0; + for (size_t i=0; i<75; i++) { + ck_assert_int_eq(buffer->append(rec), 1); rec.key++; rec.value++; + cnt++; - ck_assert_int_eq(buffer->get_record_count(), i+1); - ck_assert_int_eq(buffer->get_tombstone_count(), ts_cnt); - } - - ck_assert_int_eq(buffer->is_full(), 1); - ck_assert_int_eq(buffer->append(rec), 0); - - ck_assert_int_eq(buffer->truncate(), 1); - - ck_assert_int_eq(buffer->is_full(), 0); - ck_assert_int_eq(buffer->get_record_count(), 0); - ck_assert_int_eq(buffer->get_tombstone_count(), 0); - ck_assert_int_eq(buffer->append(rec), 1); - - delete buffer; - -} -END_TEST - - -START_TEST(t_get_data) -{ - size_t cnt = 100; - - auto buffer = new MutableBuffer(cnt, cnt/2); - - - std::vector keys(cnt); - for (size_t i=0; iis_at_low_watermark() && new_head == 0) { + new_head = buffer->get_tail() - 1; + } } - // duplicate final two records for tombstone testing - // purposes - keys[cnt-2] = keys[cnt-3]; - keys[cnt-1] = keys[cnt-2]; - - uint32_t val = 12345; - for (size_t i=0; iappend(Rec {keys[i], val}); + ck_assert_int_eq(buffer->get_available_capacity(), 200 - cnt); + + Wrapped *view_records = new Wrapped[buffer->get_record_count()]; + { + /* get a view of the pre-advanced state */ + auto view = buffer->get_buffer_view(); + ck_assert_int_eq(view.get_record_count(), cnt); + view.copy_to_buffer((psudb::byte *) view_records); + + /* advance the head */ + buffer->advance_head(new_head); + ck_assert_int_eq(buffer->get_record_count(), 25); + ck_assert_int_eq(buffer->get_buffer_view().get_record_count(), 25); + ck_assert_int_eq(view.get_record_count(), cnt); + ck_assert_int_eq(buffer->get_available_capacity(), 200 - cnt); } - Rec r1 = {keys[cnt-2], val}; - buffer->append(r1, true); - - Rec r2 = {keys[cnt-1], val}; - buffer->append(r2, true); - - - auto *sorted_records = buffer->get_data(); - std::sort(keys.begin(), keys.end()); - std::sort(sorted_records, sorted_records + buffer->get_record_count(), std::less>()); - - for (size_t i=0; iget_available_capacity(), 175); delete buffer; + delete[] view_records; } END_TEST - void insert_records(std::vector> *values, size_t start, size_t stop, MutableBuffer *buffer) { for (size_t i=start; i> *values, size_t s } -#if DE_MT_TEST +/* START_TEST(t_multithreaded_insert) { size_t cnt = 10000; - auto buffer = new MutableBuffer(cnt, true, cnt/2); + auto buffer = new MutableBuffer(cnt/2, cnt); std::vector records(cnt); for (size_t i=0; i workers(thread_cnt); @@ -239,17 +186,49 @@ START_TEST(t_multithreaded_insert) ck_assert_int_eq(buffer->is_full(), 1); ck_assert_int_eq(buffer->get_record_count(), cnt); +} +END_TEST +*/ - std::sort(records.begin(), records.end()); - auto *sorted_records = buffer->sorted_output(); - for (size_t i=0; i(100, 100); + + size_t ts_cnt = 0; + Rec rec = {0, 5}; + + for (size_t i=0; i<100; i++) { + bool ts = false; + if (i % 2 == 0) { + ts_cnt++; + ts=true; + } + + ck_assert_int_eq(buffer->append(rec, ts), 1); + ck_assert_int_eq(buffer->check_tombstone(rec), ts); + + rec.key++; + rec.value++; + + ck_assert_int_eq(buffer->get_record_count(), i+1); + ck_assert_int_eq(buffer->get_tombstone_count(), ts_cnt); } + ck_assert_int_eq(buffer->is_full(), 1); + ck_assert_int_eq(buffer->append(rec), 0); + + ck_assert_int_eq(buffer->truncate(), 1); + + ck_assert_int_eq(buffer->is_full(), 0); + ck_assert_int_eq(buffer->get_record_count(), 0); + ck_assert_int_eq(buffer->get_tombstone_count(), 0); + ck_assert_int_eq(buffer->append(rec), 1); + delete buffer; + } END_TEST -#endif Suite *unit_testing() @@ -263,10 +242,8 @@ Suite *unit_testing() TCase *append = tcase_create("de::MutableBuffer::append Testing"); tcase_add_test(append, t_insert); - tcase_add_test(append, t_insert_tombstones); - #if DE_MT_TEST - tcase_add_test(append, t_multithreaded_insert); - #endif + tcase_add_test(append, t_advance_head); + //tcase_add_test(append, t_multithreaded_insert); suite_add_tcase(unit, append); @@ -277,11 +254,6 @@ Suite *unit_testing() suite_add_tcase(unit, truncate); - TCase *sorted_out = tcase_create("de::MutableBuffer::get_data"); - tcase_add_test(sorted_out, t_get_data); - - suite_add_tcase(unit, sorted_out); - return unit; } -- cgit v1.2.3