summaryrefslogtreecommitdiffstats
path: root/include/framework/structure/MutableBuffer.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-11 11:31:33 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-11 11:31:33 -0500
commit5db0f96e9f3d2505b5f751abc133cbf7e13b5129 (patch)
tree0a66f0d6d54231e9e6d6742538e7fb99c067455b /include/framework/structure/MutableBuffer.h
parenteb19677340be6f0befe9da2199e5832af51eea0d (diff)
downloaddynamic-extension-5db0f96e9f3d2505b5f751abc133cbf7e13b5129.tar.gz
Fixed some potential buffer-related concurrency bugs
Diffstat (limited to 'include/framework/structure/MutableBuffer.h')
-rw-r--r--include/framework/structure/MutableBuffer.h106
1 files changed, 63 insertions, 43 deletions
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index a065154..3a06f0d 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -6,12 +6,22 @@
*
* Distributed under the Modified BSD License.
*
+ * NOTE: Concerning the tombstone count. One possible approach
+ * would be to track the number of tombstones below and above the
+ * low water mark--this would be straightforward to do. Then, if we
+ * *require* that the head only advance up to the LWM, we can get a
+ * correct view on the number of tombstones in the active buffer at
+ * any point in time, and the BufferView will have a pretty good
+ * approximation as well (potentially with a few extra if new inserts
+ * happen between when the tail pointer and tombstone count are fetched)
+ *
*/
#pragma once
#include <cstdlib>
#include <atomic>
#include <cassert>
+#include <immintrin.h>
#include "psu-util/alignment.h"
#include "util/bf_config.h"
@@ -28,20 +38,22 @@ class MutableBuffer {
friend class BufferView<R>;
public:
MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0)
- : m_lwm(low_watermark), m_hwm(high_watermark), m_cap(capacity), m_head(0), m_tail(0),
- m_old_head(0), m_head_refcnt(0), m_old_head_refcnt(0) {
- /*
- * default capacity is twice the high water mark, to account for the worst-case
- * memory requirements.
- */
- if (m_cap == 0) {
- m_cap = m_hwm * 2;
- }
-
- m_data = (Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>));
-
- // FIXME: need to figure out how to detail with tombstones at some point...
- m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS);
+ : m_lwm(low_watermark)
+ , m_hwm(high_watermark)
+ , m_cap((capacity == 0) ? 2 * high_watermark : capacity)
+ , m_tail(0)
+ , m_head(0)
+ , m_head_refcnt(0)
+ , m_old_head(0)
+ , m_old_head_refcnt(0)
+ , m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)))
+ , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS))
+ , m_tscnt(0)
+ , m_old_tscnt(0)
+ , m_active_head_advance(false)
+ {
+ assert(m_cap > m_hwm);
+ assert(m_hwm > m_lwm);
}
~MutableBuffer() {
@@ -62,7 +74,7 @@ public:
m_data[pos].header |= (pos << 2);
if (tombstone) {
- m_tombstonecnt.fetch_add(1);
+ m_tscnt.fetch_add(1);
if (m_tombstone_filter) m_tombstone_filter->insert(rec);
}
@@ -70,7 +82,7 @@ public:
}
bool truncate() {
- m_tombstonecnt.store(0);
+ m_tscnt.store(0);
m_tail.store(0);
if (m_tombstone_filter) m_tombstone_filter->clear();
@@ -78,7 +90,7 @@ public:
}
size_t get_record_count() {
- return (m_tail - m_head) % m_cap;
+ return m_tail - m_head;
}
size_t get_capacity() {
@@ -94,30 +106,15 @@ public:
}
size_t get_tombstone_count() {
- return m_tombstonecnt.load();
+ return m_tscnt.load();
}
bool delete_record(const R& rec) {
- for (size_t i=0; i<get_record_count(); i++) {
- if (m_data[to_idx(i)].rec == rec) {
- m_data[to_idx(i)].set_delete();
- return true;
- }
- }
-
- return false;
- }
+ return get_buffer_view().delete_record(rec);
+ }
bool check_tombstone(const R& rec) {
- if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false;
-
- for (size_t i=0; i<get_record_count(); i++) {
- if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) {
- return true;
- }
- }
-
- return false;
+ return get_buffer_view().check_tombstone(rec);
}
size_t get_memory_usage() {
@@ -130,7 +127,8 @@ public:
BufferView<R> get_buffer_view() {
m_head_refcnt.fetch_add(1);
- return BufferView(m_data, m_head, m_tail.load(), m_tombstone_filter, (void*) this, release_head_reference);
+ auto f = std::bind(release_head_reference, (void *) this, m_head.load());
+ return BufferView<R>(m_data, m_cap, m_head.load(), m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
}
/*
@@ -147,6 +145,8 @@ public:
return false;
}
+ m_active_head_advance.store(true);
+
/*
* the order here is very important. We first store zero to the
* old_refcnt (should be zero anyway). Then we move the current
@@ -157,12 +157,14 @@ public:
* dropped. Only after this do we change to the new head
*/
m_old_head_refcnt.store(0);
+
m_old_head.store(m_head.load());
m_old_head_refcnt.fetch_add(m_head_refcnt);
m_head_refcnt.store(0);
m_head.store(new_head);
+ m_active_head_advance.store(false);
return true;
}
@@ -212,29 +214,44 @@ private:
if (m_tail.load() >= m_hwm) {
return -1;
}
+
+ _mm_pause();
}
return old_value;
}
- size_t to_idx(size_t i) {
- return (m_head + i) % m_cap;
+ size_t to_idx(size_t i, size_t head) {
+ return (head + i) % m_cap;
}
static void release_head_reference(void *buff, size_t head) {
MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff;
- if (head == buffer->m_head.load()) {
- buffer->m_head_refcnt.fetch_sub(1);
- } else if (head == buffer->m_old_head.load()) {
+ /*
+ * check old head first. During a head transition, the head being
+ * retired will first be assigned to *both* head and old_head. As
+ * a result, any refcnt updates during this time should be applied
+ * to old_head, even if the current head and the head being released
+ * also match.
+ */
+ if (head == buffer->m_old_head.load()) {
buffer->m_old_head_refcnt.fetch_sub(1);
/*
* if the old head refcnt drops to 0, free
* the records by setting old_head = head
+ * before this, spin while the two heads are equal to
+ * avoid
*/
+ while (buffer->m_active_head_advance.load()) {
+ _mm_pause();
+ }
+
if (buffer->m_old_head_refcnt.load() == 0) {
buffer->m_old_head.store(buffer->m_head);
}
+ } else if (head == buffer->m_head.load()) {
+ buffer->m_head_refcnt.fetch_sub(1);
}
}
@@ -252,7 +269,10 @@ private:
Wrapped<R>* m_data;
psudb::BloomFilter<R>* m_tombstone_filter;
- alignas(64) std::atomic<size_t> m_tombstonecnt;
+ alignas(64) std::atomic<size_t> m_tscnt;
+ size_t m_old_tscnt;
+
+ alignas(64) std::atomic<bool> m_active_head_advance;
};
}