From ca729108869b4143f1eea31f6dde9195decfec9c Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 12:14:57 -0400 Subject: MutableBuffer: removed most concurrency control stuff The buffer isn't responsible for a lot of CC anymore (just the append operation), so this code was no longer necessary. Also removed the only calls to some of these CC operations within the rest of the framework. --- include/framework/structure/ExtensionStructure.h | 6 +-- include/framework/structure/MutableBuffer.h | 59 ++++-------------------- 2 files changed, 13 insertions(+), 52 deletions(-) diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 2ced439..f5657af 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -93,11 +93,11 @@ public: inline bool merge_buffer(Buffer *buffer) { assert(can_merge_with(0, buffer->get_record_count())); + // FIXME: this step makes an extra copy of the buffer, + // which could be avoided by adjusting the shard + // reconstruction process a bit, possibly. buffer->start_merge(); merge_buffer_into_l0(buffer); - buffer->finish_merge(); - - buffer->truncate(); return true; } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index e0a6962..a70b86b 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -42,13 +42,10 @@ public: } m_refcnt.store(0); - m_deferred_truncate.store(false); - m_merging.store(false); } ~MutableBuffer() { assert(m_refcnt.load() == 0); - assert(m_merging.load() == false); if (m_data) free(m_data); if (m_tombstone_filter) delete m_tombstone_filter; @@ -90,23 +87,12 @@ public: } bool truncate() { - - while (active_merge() || m_refcnt.load() > 0) - ; - - m_merge_lock.lock(); - - while (m_refcnt > 0) - ; - m_tombstonecnt.store(0); m_reccnt.store(0); m_weight.store(0); m_max_weight.store(0); if (m_tombstone_filter) m_tombstone_filter->clear(); - m_merge_lock.unlock(); - return true; } @@ -176,26 +162,15 @@ public: return m_max_weight; } + /* + * This operation assumes that no other threads have write access + * to the buffer. This will be the case in normal operation, at + * present, but may change (in which case this approach will need + * to be adjusted). Other threads having read access is perfectly + * acceptable, however. + */ bool start_merge() { - if (m_merge_lock.try_lock()) { - /* there cannot already been an active merge */ - if (m_merging.load()) { - m_merge_lock.unlock(); - return false; - } - - m_merging.store(true); - memcpy(m_merge_data, m_data, sizeof(Wrapped) * m_reccnt.load()); - return true; - } - - /* lock could not be obtained */ - return false; - } - - bool finish_merge() { - m_merge_lock.unlock(); - m_merging.store(false); + memcpy(m_merge_data, m_data, sizeof(Wrapped) * m_reccnt.load()); return true; } @@ -208,12 +183,8 @@ public: } bool release_reference() { + assert(m_refcnt > 0); m_refcnt.fetch_add(-1); - - if (m_refcnt.load() == 0 && m_deferred_truncate.load()) { - assert(this->truncate()); - } - return true; } @@ -221,10 +192,6 @@ public: return m_refcnt.load(); } - bool active_merge() { - return m_merging.load(); - } - private: int32_t try_advance_tail() { size_t new_tail = m_reccnt.fetch_add(1); @@ -245,14 +212,8 @@ private: alignas(64) std::atomic m_reccnt; alignas(64) std::atomic m_weight; alignas(64) std::atomic m_max_weight; - alignas(64) std::atomic m_merging; - alignas(64) std::atomic m_deferred_truncate; - alignas(64) std::atomic m_refcnt; - - alignas(64) std::mutex m_merge_lock; - alignas(64) std::mutex m_trunc_lock; - alignas(64) std::condition_variable m_trunc_signal; + alignas(64) std::atomic m_refcnt; }; } -- cgit v1.2.3