summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-11 11:31:33 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-11 11:31:33 -0500
commit5db0f96e9f3d2505b5f751abc133cbf7e13b5129 (patch)
tree0a66f0d6d54231e9e6d6742538e7fb99c067455b /include/framework
parenteb19677340be6f0befe9da2199e5832af51eea0d (diff)
downloaddynamic-extension-5db0f96e9f3d2505b5f751abc133cbf7e13b5129.tar.gz
Fixed some potential buffer-related concurrency bugs
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/structure/BufferView.h45
-rw-r--r--include/framework/structure/MutableBuffer.h106
2 files changed, 94 insertions, 57 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index 7e8af45..00b6101 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -18,31 +18,43 @@
namespace de {
-typedef std::function<void(void*, size_t)> ReleaseFunction;
+typedef std::_Bind<void (*(void*, long unsigned int))(void*, long unsigned int)> ReleaseFunction;
template <RecordInterface R>
class BufferView {
public:
BufferView() = default;
- BufferView(const Wrapped<R> *buffer, size_t head, size_t tail, psudb::BloomFilter<R> *filter,
- void *parent_buffer, ReleaseFunction release)
- : m_buffer(buffer)
+ BufferView(const Wrapped<R> *buffer, size_t cap, size_t head, size_t tail, size_t tombstone_cnt, psudb::BloomFilter<R> *filter,
+ ReleaseFunction release)
+ : m_data(buffer)
, m_release(release)
- , m_parent_buffer(parent_buffer)
, m_head(head)
, m_tail(tail)
+ , m_cap(cap)
+ , m_approx_ts_cnt(tombstone_cnt)
, m_tombstone_filter(filter) {}
~BufferView() {
- m_release(m_parent_buffer, m_head);
+ m_release();
}
bool check_tombstone(const R& rec) {
if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false;
for (size_t i=0; i<get_record_count(); i++) {
- if (m_buffer[to_idx(i)].rec == rec && m_buffer[to_idx(i)].is_tombstone()) {
+ if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ bool delete_record(const R& rec) {
+ for (size_t i=0; i<get_record_count(); i++) {
+ if (m_data[to_idx(i)].rec == rec) {
+ m_data[to_idx(i)].set_delete();
return true;
}
}
@@ -54,30 +66,35 @@ public:
return m_tail - m_head;
}
+ /*
+ * NOTE: This function returns an upper bound on the number
+ * of tombstones within the view. There may be less than
+ * this, due to synchronization issues during view creation.
+ */
size_t get_tombstone_count() {
- // FIXME: tombstone count
- return 0;
+ return m_approx_ts_cnt;
}
Wrapped<R> *get(size_t i) {
assert(i < get_record_count());
- return m_buffer + to_idx(i);
+ return m_data + to_idx(i);
}
void copy_to_buffer(psudb::byte *buffer) {
- memcpy(buffer, (std::byte*) (m_buffer + m_head), get_record_count() * sizeof(Wrapped<R>));
+ memcpy(buffer, (std::byte*) (m_data + m_head), get_record_count() * sizeof(Wrapped<R>));
}
private:
- const Wrapped<R>* m_buffer;
- void *m_parent_buffer;
+ const Wrapped<R>* m_data;
ReleaseFunction m_release;
size_t m_head;
size_t m_tail;
+ size_t m_cap;
+ size_t m_approx_ts_cnt;
psudb::BloomFilter<R> *m_tombstone_filter;
size_t to_idx(size_t i) {
- return (m_head + i) % m_buffer->get_capacity();
+ return (m_head + i) % m_cap;
}
};
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index a065154..3a06f0d 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -6,12 +6,22 @@
*
* Distributed under the Modified BSD License.
*
+ * NOTE: Concerning the tombstone count. One possible approach
+ * would be to track the number of tombstones below and above the
+ * low water mark--this would be straightforward to do. Then, if we
+ * *require* that the head only advance up to the LWM, we can get a
+ * correct view on the number of tombstones in the active buffer at
+ * any point in time, and the BufferView will have a pretty good
+ * approximation as well (potentially with a few extra if new inserts
+ * happen between when the tail pointer and tombstone count are fetched)
+ *
*/
#pragma once
#include <cstdlib>
#include <atomic>
#include <cassert>
+#include <immintrin.h>
#include "psu-util/alignment.h"
#include "util/bf_config.h"
@@ -28,20 +38,22 @@ class MutableBuffer {
friend class BufferView<R>;
public:
MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0)
- : m_lwm(low_watermark), m_hwm(high_watermark), m_cap(capacity), m_head(0), m_tail(0),
- m_old_head(0), m_head_refcnt(0), m_old_head_refcnt(0) {
- /*
- * default capacity is twice the high water mark, to account for the worst-case
- * memory requirements.
- */
- if (m_cap == 0) {
- m_cap = m_hwm * 2;
- }
-
- m_data = (Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>));
-
- // FIXME: need to figure out how to detail with tombstones at some point...
- m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS);
+ : m_lwm(low_watermark)
+ , m_hwm(high_watermark)
+ , m_cap((capacity == 0) ? 2 * high_watermark : capacity)
+ , m_tail(0)
+ , m_head(0)
+ , m_head_refcnt(0)
+ , m_old_head(0)
+ , m_old_head_refcnt(0)
+ , m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)))
+ , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS))
+ , m_tscnt(0)
+ , m_old_tscnt(0)
+ , m_active_head_advance(false)
+ {
+ assert(m_cap > m_hwm);
+ assert(m_hwm > m_lwm);
}
~MutableBuffer() {
@@ -62,7 +74,7 @@ public:
m_data[pos].header |= (pos << 2);
if (tombstone) {
- m_tombstonecnt.fetch_add(1);
+ m_tscnt.fetch_add(1);
if (m_tombstone_filter) m_tombstone_filter->insert(rec);
}
@@ -70,7 +82,7 @@ public:
}
bool truncate() {
- m_tombstonecnt.store(0);
+ m_tscnt.store(0);
m_tail.store(0);
if (m_tombstone_filter) m_tombstone_filter->clear();
@@ -78,7 +90,7 @@ public:
}
size_t get_record_count() {
- return (m_tail - m_head) % m_cap;
+ return m_tail - m_head;
}
size_t get_capacity() {
@@ -94,30 +106,15 @@ public:
}
size_t get_tombstone_count() {
- return m_tombstonecnt.load();
+ return m_tscnt.load();
}
bool delete_record(const R& rec) {
- for (size_t i=0; i<get_record_count(); i++) {
- if (m_data[to_idx(i)].rec == rec) {
- m_data[to_idx(i)].set_delete();
- return true;
- }
- }
-
- return false;
- }
+ return get_buffer_view().delete_record(rec);
+ }
bool check_tombstone(const R& rec) {
- if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false;
-
- for (size_t i=0; i<get_record_count(); i++) {
- if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) {
- return true;
- }
- }
-
- return false;
+ return get_buffer_view().check_tombstone(rec);
}
size_t get_memory_usage() {
@@ -130,7 +127,8 @@ public:
BufferView<R> get_buffer_view() {
m_head_refcnt.fetch_add(1);
- return BufferView(m_data, m_head, m_tail.load(), m_tombstone_filter, (void*) this, release_head_reference);
+ auto f = std::bind(release_head_reference, (void *) this, m_head.load());
+ return BufferView<R>(m_data, m_cap, m_head.load(), m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
}
/*
@@ -147,6 +145,8 @@ public:
return false;
}
+ m_active_head_advance.store(true);
+
/*
* the order here is very important. We first store zero to the
* old_refcnt (should be zero anyway). Then we move the current
@@ -157,12 +157,14 @@ public:
* dropped. Only after this do we change to the new head
*/
m_old_head_refcnt.store(0);
+
m_old_head.store(m_head.load());
m_old_head_refcnt.fetch_add(m_head_refcnt);
m_head_refcnt.store(0);
m_head.store(new_head);
+ m_active_head_advance.store(false);
return true;
}
@@ -212,29 +214,44 @@ private:
if (m_tail.load() >= m_hwm) {
return -1;
}
+
+ _mm_pause();
}
return old_value;
}
- size_t to_idx(size_t i) {
- return (m_head + i) % m_cap;
+ size_t to_idx(size_t i, size_t head) {
+ return (head + i) % m_cap;
}
static void release_head_reference(void *buff, size_t head) {
MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff;
- if (head == buffer->m_head.load()) {
- buffer->m_head_refcnt.fetch_sub(1);
- } else if (head == buffer->m_old_head.load()) {
+ /*
+ * check old head first. During a head transition, the head being
+ * retired will first be assigned to *both* head and old_head. As
+ * a result, any refcnt updates during this time should be applied
+ * to old_head, even if the current head and the head being released
+ * also match.
+ */
+ if (head == buffer->m_old_head.load()) {
buffer->m_old_head_refcnt.fetch_sub(1);
/*
* if the old head refcnt drops to 0, free
* the records by setting old_head = head
+ * before this, spin while the two heads are equal to
+ * avoid
*/
+ while (buffer->m_active_head_advance.load()) {
+ _mm_pause();
+ }
+
if (buffer->m_old_head_refcnt.load() == 0) {
buffer->m_old_head.store(buffer->m_head);
}
+ } else if (head == buffer->m_head.load()) {
+ buffer->m_head_refcnt.fetch_sub(1);
}
}
@@ -252,7 +269,10 @@ private:
Wrapped<R>* m_data;
psudb::BloomFilter<R>* m_tombstone_filter;
- alignas(64) std::atomic<size_t> m_tombstonecnt;
+ alignas(64) std::atomic<size_t> m_tscnt;
+ size_t m_old_tscnt;
+
+ alignas(64) std::atomic<bool> m_active_head_advance;
};
}