diff options
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 17 |
1 files changed, 12 insertions, 5 deletions
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index a70b86b..ba25cc3 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -33,7 +33,7 @@ class MutableBuffer { public: MutableBuffer(size_t capacity, size_t max_tombstone_cap) : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) - , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { + , m_tombstonecnt(0), m_weight(0), m_max_weight(0), m_tail(0) { m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>)); m_merge_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>)); m_tombstone_filter = nullptr; @@ -83,6 +83,7 @@ public: m_weight.fetch_add(1); } + m_reccnt.fetch_add(1); return 1; } @@ -91,6 +92,7 @@ public: m_reccnt.store(0); m_weight.store(0); m_max_weight.store(0); + m_tail.store(0); if (m_tombstone_filter) m_tombstone_filter->clear(); return true; @@ -193,11 +195,15 @@ public: } private: - int32_t try_advance_tail() { - size_t new_tail = m_reccnt.fetch_add(1); + int64_t try_advance_tail() { + int64_t new_tail = m_tail.fetch_add(1); - if (new_tail < m_cap) return new_tail; - else return -1; + if (new_tail < m_cap) { + return new_tail; + } + + m_tail.fetch_add(-1); + return -1; } size_t m_cap; @@ -210,6 +216,7 @@ private: alignas(64) std::atomic<size_t> m_tombstonecnt; alignas(64) std::atomic<uint32_t> m_reccnt; + alignas(64) std::atomic<int64_t> m_tail; alignas(64) std::atomic<double> m_weight; alignas(64) std::atomic<double> m_max_weight; |