diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-11 11:31:33 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-11 11:31:33 -0500 |
| commit | 5db0f96e9f3d2505b5f751abc133cbf7e13b5129 (patch) | |
| tree | 0a66f0d6d54231e9e6d6742538e7fb99c067455b /include/framework/structure/MutableBuffer.h | |
| parent | eb19677340be6f0befe9da2199e5832af51eea0d (diff) | |
| download | dynamic-extension-5db0f96e9f3d2505b5f751abc133cbf7e13b5129.tar.gz | |
Fixed some potential buffer-related concurrency bugs
Diffstat (limited to 'include/framework/structure/MutableBuffer.h')
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 106 |
1 files changed, 63 insertions, 43 deletions
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index a065154..3a06f0d 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -6,12 +6,22 @@ * * Distributed under the Modified BSD License. * + * NOTE: Concerning the tombstone count. One possible approach + * would be to track the number of tombstones below and above the + * low water mark--this would be straightforward to do. Then, if we + * *require* that the head only advance up to the LWM, we can get a + * correct view on the number of tombstones in the active buffer at + * any point in time, and the BufferView will have a pretty good + * approximation as well (potentially with a few extra if new inserts + * happen between when the tail pointer and tombstone count are fetched) + * */ #pragma once #include <cstdlib> #include <atomic> #include <cassert> +#include <immintrin.h> #include "psu-util/alignment.h" #include "util/bf_config.h" @@ -28,20 +38,22 @@ class MutableBuffer { friend class BufferView<R>; public: MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0) - : m_lwm(low_watermark), m_hwm(high_watermark), m_cap(capacity), m_head(0), m_tail(0), - m_old_head(0), m_head_refcnt(0), m_old_head_refcnt(0) { - /* - * default capacity is twice the high water mark, to account for the worst-case - * memory requirements. - */ - if (m_cap == 0) { - m_cap = m_hwm * 2; - } - - m_data = (Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)); - - // FIXME: need to figure out how to detail with tombstones at some point... - m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS); + : m_lwm(low_watermark) + , m_hwm(high_watermark) + , m_cap((capacity == 0) ? 2 * high_watermark : capacity) + , m_tail(0) + , m_head(0) + , m_head_refcnt(0) + , m_old_head(0) + , m_old_head_refcnt(0) + , m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>))) + , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS)) + , m_tscnt(0) + , m_old_tscnt(0) + , m_active_head_advance(false) + { + assert(m_cap > m_hwm); + assert(m_hwm > m_lwm); } ~MutableBuffer() { @@ -62,7 +74,7 @@ public: m_data[pos].header |= (pos << 2); if (tombstone) { - m_tombstonecnt.fetch_add(1); + m_tscnt.fetch_add(1); if (m_tombstone_filter) m_tombstone_filter->insert(rec); } @@ -70,7 +82,7 @@ public: } bool truncate() { - m_tombstonecnt.store(0); + m_tscnt.store(0); m_tail.store(0); if (m_tombstone_filter) m_tombstone_filter->clear(); @@ -78,7 +90,7 @@ public: } size_t get_record_count() { - return (m_tail - m_head) % m_cap; + return m_tail - m_head; } size_t get_capacity() { @@ -94,30 +106,15 @@ public: } size_t get_tombstone_count() { - return m_tombstonecnt.load(); + return m_tscnt.load(); } bool delete_record(const R& rec) { - for (size_t i=0; i<get_record_count(); i++) { - if (m_data[to_idx(i)].rec == rec) { - m_data[to_idx(i)].set_delete(); - return true; - } - } - - return false; - } + return get_buffer_view().delete_record(rec); + } bool check_tombstone(const R& rec) { - if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; - - for (size_t i=0; i<get_record_count(); i++) { - if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) { - return true; - } - } - - return false; + return get_buffer_view().check_tombstone(rec); } size_t get_memory_usage() { @@ -130,7 +127,8 @@ public: BufferView<R> get_buffer_view() { m_head_refcnt.fetch_add(1); - return BufferView(m_data, m_head, m_tail.load(), m_tombstone_filter, (void*) this, release_head_reference); + auto f = std::bind(release_head_reference, (void *) this, m_head.load()); + return BufferView<R>(m_data, m_cap, m_head.load(), m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); } /* @@ -147,6 +145,8 @@ public: return false; } + m_active_head_advance.store(true); + /* * the order here is very important. We first store zero to the * old_refcnt (should be zero anyway). Then we move the current @@ -157,12 +157,14 @@ public: * dropped. Only after this do we change to the new head */ m_old_head_refcnt.store(0); + m_old_head.store(m_head.load()); m_old_head_refcnt.fetch_add(m_head_refcnt); m_head_refcnt.store(0); m_head.store(new_head); + m_active_head_advance.store(false); return true; } @@ -212,29 +214,44 @@ private: if (m_tail.load() >= m_hwm) { return -1; } + + _mm_pause(); } return old_value; } - size_t to_idx(size_t i) { - return (m_head + i) % m_cap; + size_t to_idx(size_t i, size_t head) { + return (head + i) % m_cap; } static void release_head_reference(void *buff, size_t head) { MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff; - if (head == buffer->m_head.load()) { - buffer->m_head_refcnt.fetch_sub(1); - } else if (head == buffer->m_old_head.load()) { + /* + * check old head first. During a head transition, the head being + * retired will first be assigned to *both* head and old_head. As + * a result, any refcnt updates during this time should be applied + * to old_head, even if the current head and the head being released + * also match. + */ + if (head == buffer->m_old_head.load()) { buffer->m_old_head_refcnt.fetch_sub(1); /* * if the old head refcnt drops to 0, free * the records by setting old_head = head + * before this, spin while the two heads are equal to + * avoid */ + while (buffer->m_active_head_advance.load()) { + _mm_pause(); + } + if (buffer->m_old_head_refcnt.load() == 0) { buffer->m_old_head.store(buffer->m_head); } + } else if (head == buffer->m_head.load()) { + buffer->m_head_refcnt.fetch_sub(1); } } @@ -252,7 +269,10 @@ private: Wrapped<R>* m_data; psudb::BloomFilter<R>* m_tombstone_filter; - alignas(64) std::atomic<size_t> m_tombstonecnt; + alignas(64) std::atomic<size_t> m_tscnt; + size_t m_old_tscnt; + + alignas(64) std::atomic<bool> m_active_head_advance; }; } |