From 2ded45f5a20f38fdfd9f348c446c38dc713a5591 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 3 Mar 2025 13:41:19 -0500 Subject: Fixed a few concurrency bugs --- include/framework/structure/MutableBuffer.h | 160 ++++++++++++++++------------ 1 file changed, 89 insertions(+), 71 deletions(-) (limited to 'include/framework/structure/MutableBuffer.h') diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index e62a495..7357915 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -1,7 +1,7 @@ /* * include/framework/structure/MutableBuffer.h * - * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Copyright (C) 2023-2025 Douglas B. Rumbaugh * Dong Xie * * Distributed under the Modified BSD License. @@ -109,11 +109,11 @@ public: size_t get_tombstone_count() const { return m_tscnt.load(); } bool delete_record(const R &rec) { - return get_buffer_view().delete_record(rec); + return get_buffer_view(m_head.load().head_idx).delete_record(rec); } bool check_tombstone(const R &rec) { - return get_buffer_view().check_tombstone(rec); + return get_buffer_view(m_head.load().head_idx).check_tombstone(rec); } size_t get_memory_usage() const { return m_cap * sizeof(Wrapped); } @@ -122,20 +122,9 @@ public: return m_tombstone_filter->get_memory_usage(); } - BufferView get_buffer_view(size_t target_head) { - size_t head = get_head(target_head); - auto f = std::bind(release_head_reference, (void *)this, head); - + BufferView get_buffer_view(size_t head) { return BufferView(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), - m_tombstone_filter, f); - } - - BufferView get_buffer_view() { - size_t head = get_head(m_head.load().head_idx); - auto f = std::bind(release_head_reference, (void *)this, head); - - return BufferView(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), - m_tombstone_filter, f); + m_tombstone_filter); } /* @@ -159,7 +148,7 @@ public: // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, m_head.load().refcnt); - buffer_head new_hd = {new_head, 0}; + buffer_head new_hd = {new_head, 1}; buffer_head cur_hd; /* replace current head with new head */ @@ -174,32 +163,6 @@ public: return true; } - /* - * FIXME: If target_head does not match *either* the old_head or the - * current_head, this routine will loop infinitely. - */ - size_t get_head(size_t target_head) { - buffer_head cur_hd, new_hd; - bool head_acquired = false; - - - //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx); - do { - if (m_old_head.load().head_idx == target_head) { - cur_hd = m_old_head.load(); - cur_hd.head_idx = target_head; - new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; - head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); - } else if (m_head.load().head_idx == target_head) { - cur_hd = m_head.load(); - cur_hd.head_idx = target_head; - new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; - head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); - } - } while (!head_acquired); - - return new_hd.head_idx; - } void set_low_watermark(size_t lwm) { assert(lwm < m_hwm); @@ -234,8 +197,90 @@ public: return m_cap - (m_tail.load() - m_old_head.load().head_idx); } + size_t debug_get_old_head() const { + return m_old_head.load().head_idx; + } + + size_t debug_get_head() const { + return m_head.load().head_idx; + } + + bool take_head_reference(size_t target_head) { + buffer_head cur_hd, new_hd; + bool head_acquired = false; + + do { + if (m_old_head.load().head_idx == target_head) { + cur_hd = m_old_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else if (m_head.load().head_idx == target_head) { + cur_hd = m_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } while (!head_acquired); + + return head_acquired; + } + + bool release_head_reference(size_t head) { + buffer_head cur_hd, new_hd; + bool head_released = false; + do { + if (m_old_head.load().head_idx == head) { + cur_hd = m_old_head; + + assert(cur_hd.refcnt > 0); + new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; + head_released = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else { + cur_hd = m_head; + + /* it's possible the head got pushed from current to old */ + if (cur_hd.head_idx == head) { + assert(cur_hd.refcnt > 0); + new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; + head_released = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } + _mm_pause(); + } while (!head_released); + + return head_released; + } + private: - int64_t try_advance_tail() { + /* + * FIXME: If target_head does not match *either* the old_head or the + * current_head, this routine will loop infinitely. + */ + size_t get_head(size_t target_head) { + buffer_head cur_hd, new_hd; + bool head_acquired = false; + + + //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx); + do { + if (m_old_head.load().head_idx == target_head) { + cur_hd = m_old_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else if (m_head.load().head_idx == target_head) { + cur_hd = m_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } while (!head_acquired); + + return new_hd.head_idx; + } + + ssize_t try_advance_tail() { size_t old_value = m_tail.load(); /* if full, fail to advance the tail */ @@ -257,33 +302,6 @@ private: size_t to_idx(size_t i, size_t head) { return (head + i) % m_cap; } - static void release_head_reference(void *buff, size_t head) { - MutableBuffer *buffer = (MutableBuffer *)buff; - - buffer_head cur_hd, new_hd; - do { - if (buffer->m_old_head.load().head_idx == head) { - cur_hd = buffer->m_old_head; - if (cur_hd.refcnt == 0) - continue; - new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; - if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { - break; - } - } else { - cur_hd = buffer->m_head; - if (cur_hd.refcnt == 0) - continue; - new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; - - if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { - break; - } - } - _mm_pause(); - } while (true); - } - size_t m_lwm; size_t m_hwm; size_t m_cap; -- cgit v1.2.3