From 138c793b0a58577713d98c98bb140cf1d9c79bee Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 17 Jan 2024 18:22:00 -0500 Subject: Multiple concurrency bug fixes A poorly organized commit with fixes for a variety of bugs that were causing missing records. The core problems all appear to be fixed, though there is an outstanding problem with tombstones not being completely canceled. A very small number are appearing in the wrong order during the static structure test. --- include/framework/structure/MutableBuffer.h | 58 ++++++++++++++++++----------- 1 file changed, 37 insertions(+), 21 deletions(-) (limited to 'include/framework/structure/MutableBuffer.h') diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index eeb3dc9..7edde2f 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -133,18 +133,18 @@ public: return m_tombstone_filter->get_memory_usage(); } - BufferView get_buffer_view() { - size_t head = get_head(); + BufferView get_buffer_view(size_t target_head) { + size_t head = get_head(target_head); auto f = std::bind(release_head_reference, (void *) this, head); return BufferView(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); } - BufferView get_flush_buffer_view() { - size_t head = get_head(); + BufferView get_buffer_view() { + size_t head = get_head(m_head.load().head_idx); auto f = std::bind(release_head_reference, (void *) this, head); - return BufferView(m_data, m_cap, head, head + m_lwm, m_tscnt.load(), m_tombstone_filter, f); + return BufferView(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); } /* @@ -167,23 +167,39 @@ public: buffer_head new_hd = {new_head, 0}; buffer_head cur_hd; - /* move the current head into the old head */ + /* replace current head with new head */ do { - buffer_head cur_hd = m_head.load(); - m_old_head.store(cur_hd); + cur_hd = m_head.load(); } while(!m_head.compare_exchange_strong(cur_hd, new_hd)); + /* move the current head into the old head */ + m_old_head.store(cur_hd); + m_active_head_advance.store(false); return true; } - size_t get_head() { + /* + * FIXME: If target_head does not match *either* the old_head or the + * current_head, this routine will loop infinitely. + */ + size_t get_head(size_t target_head) { buffer_head cur_hd, new_hd; + bool head_acquired = false; do { - cur_hd = m_head.load(); - new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; - } while(!m_head.compare_exchange_strong(cur_hd, new_hd)); + if (m_old_head.load().head_idx == target_head) { + cur_hd = m_old_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else if (m_head.load().head_idx == target_head){ + cur_hd = m_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } while(!head_acquired); return new_hd.head_idx; } @@ -254,22 +270,22 @@ private: buffer_head cur_hd, new_hd; do { - if (buffer->m_head.load().head_idx == head) { - cur_hd = buffer->m_head; - assert(cur_hd.refcnt > 0); + if (buffer->m_old_head.load().head_idx == head) { + cur_hd = buffer->m_old_head; + if (cur_hd.refcnt == 0) continue; new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; - - if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { + if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { break; } } else { - cur_hd = buffer->m_old_head; - assert(cur_hd.refcnt > 0); + cur_hd = buffer->m_head; + if (cur_hd.refcnt == 0) continue; new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; - if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { + + if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { break; } - } + } _mm_pause(); } while(true); } -- cgit v1.2.3