diff options
| author | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-01-15 14:01:36 -0500 |
|---|---|---|
| committer | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-01-15 14:01:36 -0500 |
| commit | cf178ae74a76b780b655a447531d2114f9f81d98 (patch) | |
| tree | c992a3209a650bb90540dd4449e1d8111f216458 /include/framework/structure/MutableBuffer.h | |
| parent | aac0bb661af8fae38d3ce08d6078cb4d9dfcb575 (diff) | |
| download | dynamic-extension-cf178ae74a76b780b655a447531d2114f9f81d98.tar.gz | |
Various single-threaded bug fixes
Diffstat (limited to 'include/framework/structure/MutableBuffer.h')
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 26 |
1 files changed, 22 insertions, 4 deletions
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 3a06f0d..5b655fc 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -44,7 +44,7 @@ public: , m_tail(0) , m_head(0) , m_head_refcnt(0) - , m_old_head(0) + , m_old_head(high_watermark) , m_old_head_refcnt(0) , m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>))) , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS)) @@ -62,14 +62,18 @@ public: } int append(const R &rec, bool tombstone=false) { - int32_t pos = 0; - if ((pos = try_advance_tail()) == -1) return 0; + int32_t tail = 0; + if ((tail = try_advance_tail()) == -1) { + return 0; + } Wrapped<R> wrec; wrec.rec = rec; wrec.header = 0; if (tombstone) wrec.set_tombstone(); + size_t pos = tail % m_cap; + m_data[pos] = wrec; m_data[pos].header |= (pos << 2); @@ -131,6 +135,13 @@ public: return BufferView<R>(m_data, m_cap, m_head.load(), m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); } + BufferView<R> get_flush_buffer_view() { + m_head_refcnt.fetch_add(1); + auto f = std::bind(release_head_reference, (void *) this, m_head.load()); + return BufferView<R>(m_data, m_cap, m_head.load(), m_head.load() + m_lwm, m_tscnt.load(), m_tombstone_filter, f); + + } + /* * Advance the buffer following a reconstruction. Move current * head and head_refcnt into old_head and old_head_refcnt, then @@ -142,6 +153,7 @@ public: /* refuse to advance head while there is an old with one references */ if (m_old_head_refcnt > 0) { + fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts"); return false; } @@ -195,6 +207,10 @@ public: * Note: this returns the available physical storage capacity, * *not* now many more records can be inserted before the * HWM is reached. + * + * FIXME: this logic is incorrect for the buffer prior to the + * first call to advance_head, and will under-report the available + * space. */ size_t get_available_capacity() { return m_cap - (m_tail.load() - m_old_head.load()); @@ -205,7 +221,7 @@ private: size_t old_value = m_tail.load(); /* if full, fail to advance the tail */ - if (old_value >= m_hwm) { + if (old_value - m_head.load() >= m_hwm) { return -1; } @@ -236,6 +252,7 @@ private: * also match. */ if (head == buffer->m_old_head.load()) { + assert(buffer->m_old_head_refcnt > 0); buffer->m_old_head_refcnt.fetch_sub(1); /* * if the old head refcnt drops to 0, free @@ -251,6 +268,7 @@ private: buffer->m_old_head.store(buffer->m_head); } } else if (head == buffer->m_head.load()) { + assert(buffer->m_head_refcnt > 0); buffer->m_head_refcnt.fetch_sub(1); } } |