summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/framework/structure/MutableBuffer.h17
1 files changed, 12 insertions, 5 deletions
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index a70b86b..ba25cc3 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -33,7 +33,7 @@ class MutableBuffer {
public:
MutableBuffer(size_t capacity, size_t max_tombstone_cap)
: m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
- , m_tombstonecnt(0), m_weight(0), m_max_weight(0) {
+ , m_tombstonecnt(0), m_weight(0), m_max_weight(0), m_tail(0) {
m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
m_merge_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
m_tombstone_filter = nullptr;
@@ -83,6 +83,7 @@ public:
m_weight.fetch_add(1);
}
+ m_reccnt.fetch_add(1);
return 1;
}
@@ -91,6 +92,7 @@ public:
m_reccnt.store(0);
m_weight.store(0);
m_max_weight.store(0);
+ m_tail.store(0);
if (m_tombstone_filter) m_tombstone_filter->clear();
return true;
@@ -193,11 +195,15 @@ public:
}
private:
- int32_t try_advance_tail() {
- size_t new_tail = m_reccnt.fetch_add(1);
+ int64_t try_advance_tail() {
+ int64_t new_tail = m_tail.fetch_add(1);
- if (new_tail < m_cap) return new_tail;
- else return -1;
+ if (new_tail < m_cap) {
+ return new_tail;
+ }
+
+ m_tail.fetch_add(-1);
+ return -1;
}
size_t m_cap;
@@ -210,6 +216,7 @@ private:
alignas(64) std::atomic<size_t> m_tombstonecnt;
alignas(64) std::atomic<uint32_t> m_reccnt;
+ alignas(64) std::atomic<int64_t> m_tail;
alignas(64) std::atomic<double> m_weight;
alignas(64) std::atomic<double> m_max_weight;