From 0ff3cedf5df9c27bccd3053ce6339e317f87ff76 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 5 Feb 2024 15:18:33 -0500 Subject: BufferView: Adjusted BV to avoid repeated modulus operations --- include/framework/structure/BufferView.h | 50 ++++++++-- include/framework/structure/MutableBuffer.h | 13 ++- tests/mutable_buffer_tests.cpp | 142 ++++++++++++++++++++++++++++ 3 files changed, 190 insertions(+), 15 deletions(-) diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 30fffed..edf6707 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -38,6 +38,8 @@ public: , m_release(std::move(other.m_release)) , m_head(std::exchange(other.m_head, 0)) , m_tail(std::exchange(other.m_tail, 0)) + , m_start(std::exchange(other.m_start, 0)) + , m_stop(std::exchange(other.m_stop, 0)) , m_cap(std::exchange(other.m_cap, 0)) , m_approx_ts_cnt(std::exchange(other.m_approx_ts_cnt, 0)) , m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr)) @@ -52,6 +54,8 @@ public: , m_release(release) , m_head(head) , m_tail(tail) + , m_start(m_head % cap) + , m_stop(m_tail % cap) , m_cap(cap) , m_approx_ts_cnt(tombstone_cnt) , m_tombstone_filter(filter) @@ -76,11 +80,29 @@ public: } bool delete_record(const R& rec) { - for (size_t i=0; i *get(size_t i) { assert(i < get_record_count()); + m_total += (m_data + to_idx(i))->rec.key; return m_data + to_idx(i); } void copy_to_buffer(psudb::byte *buffer) { /* check if the region to be copied circles back to start. If so, do it in two steps */ - if ((m_head % m_cap) + get_record_count() > m_cap) { - size_t split_idx = m_cap - (m_head % m_cap); + if (m_start > m_stop) { + size_t split_idx = m_cap - m_start; - memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), split_idx* sizeof(Wrapped)); - memcpy(buffer + (split_idx * sizeof(Wrapped)), (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped)); + memcpy(buffer, (std::byte*) (m_data + m_start), split_idx* sizeof(Wrapped)); + memcpy(buffer + (split_idx * sizeof(Wrapped)), (std::byte*) m_data, m_stop * sizeof(Wrapped)); } else { - memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped)); + memcpy(buffer, (std::byte*) (m_data + m_start), get_record_count() * sizeof(Wrapped)); } } @@ -129,13 +152,20 @@ private: ReleaseFunction m_release; size_t m_head; size_t m_tail; + size_t m_start; + size_t m_stop; size_t m_cap; size_t m_approx_ts_cnt; psudb::BloomFilter *m_tombstone_filter; bool m_active; + size_t m_total; + size_t to_idx(size_t i) { - return (m_head + i) % m_cap; + size_t idx = (m_start + i >= m_cap) ? i = (m_cap - m_start) + : m_start + i; + assert(idx < m_cap); + return idx; } }; diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 94a9c41..415c95a 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -230,13 +230,16 @@ public: /* * Note: this returns the available physical storage capacity, * *not* now many more records can be inserted before the - * HWM is reached. - * - * FIXME: this logic is incorrect for the buffer prior to the - * first call to advance_head, and will under-report the available - * space. + * HWM is reached. It considers the old_head to be "free" + * when it has no remaining references. This should be true, + * but a buggy framework implementation may violate the + * assumption. */ size_t get_available_capacity() { + if (m_old_head.load().refcnt == 0) { + return m_cap - (m_tail.load() - m_head.load().head_idx); + } + return m_cap - (m_tail.load() - m_old_head.load().head_idx); } diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp index 4064412..31c16dc 100644 --- a/tests/mutable_buffer_tests.cpp +++ b/tests/mutable_buffer_tests.cpp @@ -238,6 +238,143 @@ START_TEST(t_truncate) } END_TEST +START_TEST(t_bview_get) +{ + auto buffer = new MutableBuffer(50, 100); + + /* insert 75 records and get tail when LWM is exceeded */ + size_t new_head = 0; + Rec rec = {1, 1}; + size_t cnt = 0; + for (size_t i=0; i<75; i++) { + ck_assert_int_eq(buffer->append(rec), 1); + + rec.key++; + rec.value++; + cnt++; + + if (buffer->is_at_low_watermark() && new_head == 0) { + new_head = buffer->get_tail(); + } + } + + ck_assert_int_eq(buffer->get_available_capacity(), 200 - cnt); + + { + /* get a view of the pre-advanced state */ + auto view = buffer->get_buffer_view(); + auto reccnt = view.get_record_count(); + + /* scan the records in the view */ + for (size_t i=0; irec.key, i+1); + } + + /* advance the head */ + buffer->advance_head(new_head); + + /* scan the records in the view again -- should be unchanged */ + for (size_t i=0; irec.key, i+1); + } + } + + { + /* get a new view (should have fewer records) */ + auto view = buffer->get_buffer_view(); + auto reccnt = view.get_record_count(); + + /* verify the scan again */ + for (size_t i=0; irec.key, i + 51); + } + } + + /* insert more records (to trigger a wrap-around) */ + for (size_t i=0; i<75; i++) { + ck_assert_int_eq(buffer->append(rec), 1); + + rec.key++; + rec.value++; + cnt++; + } + + { + /* get a new view (should have fewer records) */ + auto view = buffer->get_buffer_view(); + auto reccnt = view.get_record_count(); + + /* verify the scan again */ + for (size_t i=0; irec.key, i + 51); + } + } + + delete buffer; +} +END_TEST + + +START_TEST(t_bview_delete) +{ + + auto buffer = new MutableBuffer(50, 100); + + /* insert 75 records and get tail when LWM is exceeded */ + size_t new_head = 0; + Rec rec = {1, 1}; + size_t cnt = 0; + for (size_t i=0; i<75; i++) { + ck_assert_int_eq(buffer->append(rec), 1); + + rec.key++; + rec.value++; + cnt++; + + if (buffer->is_at_low_watermark() && new_head == 0) { + new_head = buffer->get_tail(); + } + } + + buffer->advance_head(new_head); + + for (size_t i=0; i<75; i++) { + ck_assert_int_eq(buffer->append(rec), 1); + + rec.key++; + rec.value++; + cnt++; + } + + Rec dr1 = {67, 67}; + Rec dr2 = {89, 89}; + Rec dr3 = {103, 103}; + + Rec fdr1 = {5, 5}; + Rec fdr2 = {300, 300}; + { + /* get a new view (should have fewer records) */ + auto view = buffer->get_buffer_view(); + ck_assert_int_eq(view.delete_record(dr1), 1); + ck_assert_int_eq(view.delete_record(dr2), 1); + ck_assert_int_eq(view.delete_record(dr3), 1); + ck_assert_int_eq(view.delete_record(fdr1), 0); + ck_assert_int_eq(view.delete_record(fdr2), 0); + + for (size_t i=0; irec == dr1 || view.get(i)->rec == dr2 + || view.get(i)->rec == dr3) { + ck_assert_int_eq(view.get(i)->is_deleted(), 1); + } else { + ck_assert_int_eq(view.get(i)->is_deleted(), 0); + } + } + } + + delete buffer; +} +END_TEST + Suite *unit_testing() { @@ -255,6 +392,11 @@ Suite *unit_testing() suite_add_tcase(unit, append); + TCase *view = tcase_create("de::BufferView Testing"); + tcase_add_test(view, t_bview_get); + tcase_add_test(view, t_bview_delete); + + suite_add_tcase(unit, view); TCase *truncate = tcase_create("de::MutableBuffer::truncate Testing"); tcase_add_test(truncate, t_truncate); -- cgit v1.2.3