diff options
| author | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2023-10-31 12:14:57 -0400 |
|---|---|---|
| committer | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2023-10-31 12:22:16 -0400 |
| commit | ca729108869b4143f1eea31f6dde9195decfec9c (patch) | |
| tree | fbfc9f2c943d392c380fab97e6ca85318226c122 /include | |
| parent | 230831243a61f1ca1b1dd4319a4c5224b15d2657 (diff) | |
| download | dynamic-extension-ca729108869b4143f1eea31f6dde9195decfec9c.tar.gz | |
MutableBuffer: removed most concurrency control stuff
The buffer isn't responsible for a lot of CC anymore (just the append
operation), so this code was no longer necessary. Also removed the only
calls to some of these CC operations within the rest of the framework.
Diffstat (limited to 'include')
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 6 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 59 |
2 files changed, 13 insertions, 52 deletions
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 2ced439..f5657af 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -93,11 +93,11 @@ public: inline bool merge_buffer(Buffer *buffer) { assert(can_merge_with(0, buffer->get_record_count())); + // FIXME: this step makes an extra copy of the buffer, + // which could be avoided by adjusting the shard + // reconstruction process a bit, possibly. buffer->start_merge(); merge_buffer_into_l0(buffer); - buffer->finish_merge(); - - buffer->truncate(); return true; } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index e0a6962..a70b86b 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -42,13 +42,10 @@ public: } m_refcnt.store(0); - m_deferred_truncate.store(false); - m_merging.store(false); } ~MutableBuffer() { assert(m_refcnt.load() == 0); - assert(m_merging.load() == false); if (m_data) free(m_data); if (m_tombstone_filter) delete m_tombstone_filter; @@ -90,23 +87,12 @@ public: } bool truncate() { - - while (active_merge() || m_refcnt.load() > 0) - ; - - m_merge_lock.lock(); - - while (m_refcnt > 0) - ; - m_tombstonecnt.store(0); m_reccnt.store(0); m_weight.store(0); m_max_weight.store(0); if (m_tombstone_filter) m_tombstone_filter->clear(); - m_merge_lock.unlock(); - return true; } @@ -176,26 +162,15 @@ public: return m_max_weight; } + /* + * This operation assumes that no other threads have write access + * to the buffer. This will be the case in normal operation, at + * present, but may change (in which case this approach will need + * to be adjusted). Other threads having read access is perfectly + * acceptable, however. + */ bool start_merge() { - if (m_merge_lock.try_lock()) { - /* there cannot already been an active merge */ - if (m_merging.load()) { - m_merge_lock.unlock(); - return false; - } - - m_merging.store(true); - memcpy(m_merge_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load()); - return true; - } - - /* lock could not be obtained */ - return false; - } - - bool finish_merge() { - m_merge_lock.unlock(); - m_merging.store(false); + memcpy(m_merge_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load()); return true; } @@ -208,12 +183,8 @@ public: } bool release_reference() { + assert(m_refcnt > 0); m_refcnt.fetch_add(-1); - - if (m_refcnt.load() == 0 && m_deferred_truncate.load()) { - assert(this->truncate()); - } - return true; } @@ -221,10 +192,6 @@ public: return m_refcnt.load(); } - bool active_merge() { - return m_merging.load(); - } - private: int32_t try_advance_tail() { size_t new_tail = m_reccnt.fetch_add(1); @@ -245,14 +212,8 @@ private: alignas(64) std::atomic<uint32_t> m_reccnt; alignas(64) std::atomic<double> m_weight; alignas(64) std::atomic<double> m_max_weight; - alignas(64) std::atomic<bool> m_merging; - alignas(64) std::atomic<bool> m_deferred_truncate; - alignas(64) std::atomic<size_t> m_refcnt; - - alignas(64) std::mutex m_merge_lock; - alignas(64) std::mutex m_trunc_lock; - alignas(64) std::condition_variable m_trunc_signal; + alignas(64) std::atomic<size_t> m_refcnt; }; } |