From 5db0f96e9f3d2505b5f751abc133cbf7e13b5129 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 11 Jan 2024 11:31:33 -0500 Subject: Fixed some potential buffer-related concurrency bugs --- include/framework/structure/BufferView.h | 45 ++++++++---- include/framework/structure/MutableBuffer.h | 106 +++++++++++++++++----------- tests/mutable_buffer_tests.cpp | 2 +- 3 files changed, 95 insertions(+), 58 deletions(-) diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 7e8af45..00b6101 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -18,31 +18,43 @@ namespace de { -typedef std::function ReleaseFunction; +typedef std::_Bind ReleaseFunction; template class BufferView { public: BufferView() = default; - BufferView(const Wrapped *buffer, size_t head, size_t tail, psudb::BloomFilter *filter, - void *parent_buffer, ReleaseFunction release) - : m_buffer(buffer) + BufferView(const Wrapped *buffer, size_t cap, size_t head, size_t tail, size_t tombstone_cnt, psudb::BloomFilter *filter, + ReleaseFunction release) + : m_data(buffer) , m_release(release) - , m_parent_buffer(parent_buffer) , m_head(head) , m_tail(tail) + , m_cap(cap) + , m_approx_ts_cnt(tombstone_cnt) , m_tombstone_filter(filter) {} ~BufferView() { - m_release(m_parent_buffer, m_head); + m_release(); } bool check_tombstone(const R& rec) { if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; for (size_t i=0; i *get(size_t i) { assert(i < get_record_count()); - return m_buffer + to_idx(i); + return m_data + to_idx(i); } void copy_to_buffer(psudb::byte *buffer) { - memcpy(buffer, (std::byte*) (m_buffer + m_head), get_record_count() * sizeof(Wrapped)); + memcpy(buffer, (std::byte*) (m_data + m_head), get_record_count() * sizeof(Wrapped)); } private: - const Wrapped* m_buffer; - void *m_parent_buffer; + const Wrapped* m_data; ReleaseFunction m_release; size_t m_head; size_t m_tail; + size_t m_cap; + size_t m_approx_ts_cnt; psudb::BloomFilter *m_tombstone_filter; size_t to_idx(size_t i) { - return (m_head + i) % m_buffer->get_capacity(); + return (m_head + i) % m_cap; } }; diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index a065154..3a06f0d 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -6,12 +6,22 @@ * * Distributed under the Modified BSD License. * + * NOTE: Concerning the tombstone count. One possible approach + * would be to track the number of tombstones below and above the + * low water mark--this would be straightforward to do. Then, if we + * *require* that the head only advance up to the LWM, we can get a + * correct view on the number of tombstones in the active buffer at + * any point in time, and the BufferView will have a pretty good + * approximation as well (potentially with a few extra if new inserts + * happen between when the tail pointer and tombstone count are fetched) + * */ #pragma once #include #include #include +#include #include "psu-util/alignment.h" #include "util/bf_config.h" @@ -28,20 +38,22 @@ class MutableBuffer { friend class BufferView; public: MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0) - : m_lwm(low_watermark), m_hwm(high_watermark), m_cap(capacity), m_head(0), m_tail(0), - m_old_head(0), m_head_refcnt(0), m_old_head_refcnt(0) { - /* - * default capacity is twice the high water mark, to account for the worst-case - * memory requirements. - */ - if (m_cap == 0) { - m_cap = m_hwm * 2; - } - - m_data = (Wrapped *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped)); - - // FIXME: need to figure out how to detail with tombstones at some point... - m_tombstone_filter = new psudb::BloomFilter(BF_FPR, m_hwm, BF_HASH_FUNCS); + : m_lwm(low_watermark) + , m_hwm(high_watermark) + , m_cap((capacity == 0) ? 2 * high_watermark : capacity) + , m_tail(0) + , m_head(0) + , m_head_refcnt(0) + , m_old_head(0) + , m_old_head_refcnt(0) + , m_data((Wrapped *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped))) + , m_tombstone_filter(new psudb::BloomFilter(BF_FPR, m_hwm, BF_HASH_FUNCS)) + , m_tscnt(0) + , m_old_tscnt(0) + , m_active_head_advance(false) + { + assert(m_cap > m_hwm); + assert(m_hwm > m_lwm); } ~MutableBuffer() { @@ -62,7 +74,7 @@ public: m_data[pos].header |= (pos << 2); if (tombstone) { - m_tombstonecnt.fetch_add(1); + m_tscnt.fetch_add(1); if (m_tombstone_filter) m_tombstone_filter->insert(rec); } @@ -70,7 +82,7 @@ public: } bool truncate() { - m_tombstonecnt.store(0); + m_tscnt.store(0); m_tail.store(0); if (m_tombstone_filter) m_tombstone_filter->clear(); @@ -78,7 +90,7 @@ public: } size_t get_record_count() { - return (m_tail - m_head) % m_cap; + return m_tail - m_head; } size_t get_capacity() { @@ -94,30 +106,15 @@ public: } size_t get_tombstone_count() { - return m_tombstonecnt.load(); + return m_tscnt.load(); } bool delete_record(const R& rec) { - for (size_t i=0; ilookup(rec)) return false; - - for (size_t i=0; i get_buffer_view() { m_head_refcnt.fetch_add(1); - return BufferView(m_data, m_head, m_tail.load(), m_tombstone_filter, (void*) this, release_head_reference); + auto f = std::bind(release_head_reference, (void *) this, m_head.load()); + return BufferView(m_data, m_cap, m_head.load(), m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); } /* @@ -147,6 +145,8 @@ public: return false; } + m_active_head_advance.store(true); + /* * the order here is very important. We first store zero to the * old_refcnt (should be zero anyway). Then we move the current @@ -157,12 +157,14 @@ public: * dropped. Only after this do we change to the new head */ m_old_head_refcnt.store(0); + m_old_head.store(m_head.load()); m_old_head_refcnt.fetch_add(m_head_refcnt); m_head_refcnt.store(0); m_head.store(new_head); + m_active_head_advance.store(false); return true; } @@ -212,29 +214,44 @@ private: if (m_tail.load() >= m_hwm) { return -1; } + + _mm_pause(); } return old_value; } - size_t to_idx(size_t i) { - return (m_head + i) % m_cap; + size_t to_idx(size_t i, size_t head) { + return (head + i) % m_cap; } static void release_head_reference(void *buff, size_t head) { MutableBuffer *buffer = (MutableBuffer *) buff; - if (head == buffer->m_head.load()) { - buffer->m_head_refcnt.fetch_sub(1); - } else if (head == buffer->m_old_head.load()) { + /* + * check old head first. During a head transition, the head being + * retired will first be assigned to *both* head and old_head. As + * a result, any refcnt updates during this time should be applied + * to old_head, even if the current head and the head being released + * also match. + */ + if (head == buffer->m_old_head.load()) { buffer->m_old_head_refcnt.fetch_sub(1); /* * if the old head refcnt drops to 0, free * the records by setting old_head = head + * before this, spin while the two heads are equal to + * avoid */ + while (buffer->m_active_head_advance.load()) { + _mm_pause(); + } + if (buffer->m_old_head_refcnt.load() == 0) { buffer->m_old_head.store(buffer->m_head); } + } else if (head == buffer->m_head.load()) { + buffer->m_head_refcnt.fetch_sub(1); } } @@ -252,7 +269,10 @@ private: Wrapped* m_data; psudb::BloomFilter* m_tombstone_filter; - alignas(64) std::atomic m_tombstonecnt; + alignas(64) std::atomic m_tscnt; + size_t m_old_tscnt; + + alignas(64) std::atomic m_active_head_advance; }; } diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp index 0520097..e61a832 100644 --- a/tests/mutable_buffer_tests.cpp +++ b/tests/mutable_buffer_tests.cpp @@ -201,7 +201,7 @@ END_TEST START_TEST(t_truncate) { - auto buffer = new MutableBuffer(100, 100); + auto buffer = new MutableBuffer(50, 100); size_t ts_cnt = 0; Rec rec = {0, 5}; -- cgit v1.2.3