summaryrefslogtreecommitdiffstats
path: root/include/framework/MutableBuffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/MutableBuffer.h')
-rw-r--r--include/framework/MutableBuffer.h67
1 files changed, 64 insertions, 3 deletions
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h
index b79fc02..cadecb6 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/MutableBuffer.h
@@ -11,6 +11,7 @@
#include <cstdlib>
#include <atomic>
+#include <condition_variable>
#include <cassert>
#include <numeric>
#include <algorithm>
@@ -33,16 +34,22 @@ public:
MutableBuffer(size_t capacity, size_t max_tombstone_cap)
: m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
, m_tombstonecnt(0), m_weight(0), m_max_weight(0) {
- auto len = capacity * sizeof(Wrapped<R>);
- size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE));
- m_data = (Wrapped<R>*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
+ m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
+ m_merge_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
m_tombstone_filter = nullptr;
if (max_tombstone_cap > 0) {
m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS);
}
+
+ m_refcnt.store(0);
+ m_deferred_truncate.store(false);
+ m_merging.store(false);
}
~MutableBuffer() {
+ assert(m_refcnt.load() == 0);
+ assert(m_merging.load() == false);
+
if (m_data) free(m_data);
if (m_tombstone_filter) delete m_tombstone_filter;
}
@@ -157,6 +164,50 @@ public:
return m_max_weight;
}
+ bool start_merge() {
+ if (m_merge_lock.try_lock()) {
+ /* there cannot already been an active merge */
+ if (m_merging.load()) {
+ m_merge_lock.unlock();
+ return false;
+ }
+
+ m_merging.store(true);
+ memcpy(m_merge_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load());
+ return true;
+ }
+
+ /* lock could not be obtained */
+ return false;
+ }
+
+ bool finish_merge() {
+ m_merge_lock.unlock();
+ return true;
+ }
+
+ /*
+ * Concurrency-related operations
+ */
+ bool take_reference() {
+ m_refcnt.fetch_add(1);
+ return true;
+ }
+
+ bool release_reference() {
+ m_refcnt.fetch_add(-1);
+
+ if (m_refcnt.load() == 0 && m_deferred_truncate.load()) {
+ assert(this->truncate());
+ }
+
+ return true;
+ }
+
+ bool active_merge() {
+ return m_merging.load();
+ }
+
private:
int32_t try_advance_tail() {
size_t new_tail = m_reccnt.fetch_add(1);
@@ -169,12 +220,22 @@ private:
size_t m_tombstone_cap;
Wrapped<R>* m_data;
+ Wrapped<R>* m_merge_data;
+
psudb::BloomFilter<R>* m_tombstone_filter;
alignas(64) std::atomic<size_t> m_tombstonecnt;
alignas(64) std::atomic<uint32_t> m_reccnt;
alignas(64) std::atomic<double> m_weight;
alignas(64) std::atomic<double> m_max_weight;
+ alignas(64) std::atomic<bool> m_merging;
+ alignas(64) std::atomic<bool> m_deferred_truncate;
+ alignas(64) std::atomic<size_t> m_refcnt;
+
+ alignas(64) std::mutex m_merge_lock;
+ alignas(64) std::mutex m_trunc_lock;
+ alignas(64) std::condition_variable m_trunc_signal;
+
};
}