diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-09-13 16:22:03 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-09-13 16:22:03 -0400 |
| commit | eb8dbaa770a57557d67c817c2839c64f536a6ce4 (patch) | |
| tree | 77bbbb79fb70f79965e7f6fd75bb5f4799a6f120 /include/framework/MutableBuffer.h | |
| parent | 076e104b8672924c3d80cd1da2fdb5ebee1766ac (diff) | |
| download | dynamic-extension-eb8dbaa770a57557d67c817c2839c64f536a6ce4.tar.gz | |
Began re-architecting the project for concurrency support
The project is now in a state where it builds, but it probably has a lot
of bugs still.
Diffstat (limited to 'include/framework/MutableBuffer.h')
| -rw-r--r-- | include/framework/MutableBuffer.h | 67 |
1 files changed, 64 insertions, 3 deletions
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index b79fc02..cadecb6 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -11,6 +11,7 @@ #include <cstdlib> #include <atomic> +#include <condition_variable> #include <cassert> #include <numeric> #include <algorithm> @@ -33,16 +34,22 @@ public: MutableBuffer(size_t capacity, size_t max_tombstone_cap) : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { - auto len = capacity * sizeof(Wrapped<R>); - size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE)); - m_data = (Wrapped<R>*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); + m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>)); + m_merge_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>)); m_tombstone_filter = nullptr; if (max_tombstone_cap > 0) { m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS); } + + m_refcnt.store(0); + m_deferred_truncate.store(false); + m_merging.store(false); } ~MutableBuffer() { + assert(m_refcnt.load() == 0); + assert(m_merging.load() == false); + if (m_data) free(m_data); if (m_tombstone_filter) delete m_tombstone_filter; } @@ -157,6 +164,50 @@ public: return m_max_weight; } + bool start_merge() { + if (m_merge_lock.try_lock()) { + /* there cannot already been an active merge */ + if (m_merging.load()) { + m_merge_lock.unlock(); + return false; + } + + m_merging.store(true); + memcpy(m_merge_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load()); + return true; + } + + /* lock could not be obtained */ + return false; + } + + bool finish_merge() { + m_merge_lock.unlock(); + return true; + } + + /* + * Concurrency-related operations + */ + bool take_reference() { + m_refcnt.fetch_add(1); + return true; + } + + bool release_reference() { + m_refcnt.fetch_add(-1); + + if (m_refcnt.load() == 0 && m_deferred_truncate.load()) { + assert(this->truncate()); + } + + return true; + } + + bool active_merge() { + return m_merging.load(); + } + private: int32_t try_advance_tail() { size_t new_tail = m_reccnt.fetch_add(1); @@ -169,12 +220,22 @@ private: size_t m_tombstone_cap; Wrapped<R>* m_data; + Wrapped<R>* m_merge_data; + psudb::BloomFilter<R>* m_tombstone_filter; alignas(64) std::atomic<size_t> m_tombstonecnt; alignas(64) std::atomic<uint32_t> m_reccnt; alignas(64) std::atomic<double> m_weight; alignas(64) std::atomic<double> m_max_weight; + alignas(64) std::atomic<bool> m_merging; + alignas(64) std::atomic<bool> m_deferred_truncate; + alignas(64) std::atomic<size_t> m_refcnt; + + alignas(64) std::mutex m_merge_lock; + alignas(64) std::mutex m_trunc_lock; + alignas(64) std::condition_variable m_trunc_signal; + }; } |