summaryrefslogtreecommitdiffstats
path: root/include/framework/structure/MutableBuffer.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-10 17:39:28 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-10 17:39:28 -0500
commit53879a0d69f5e578710b7125e9b41e516c2371d4 (patch)
treee036c77f11f07c0823eb7db9bce52269fdac6312 /include/framework/structure/MutableBuffer.h
parent3c2e6b3b456867d7155b158432b891b84e4e1dd6 (diff)
downloaddynamic-extension-53879a0d69f5e578710b7125e9b41e516c2371d4.tar.gz
MutableBuffer+View: Implementation with unit tests
Diffstat (limited to 'include/framework/structure/MutableBuffer.h')
-rw-r--r--include/framework/structure/MutableBuffer.h133
1 files changed, 98 insertions, 35 deletions
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 7bec219..d57ad6e 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -11,17 +11,11 @@
#include <cstdlib>
#include <atomic>
-#include <condition_variable>
#include <cassert>
-#include <numeric>
-#include <algorithm>
-#include <type_traits>
#include "psu-util/alignment.h"
#include "util/bf_config.h"
#include "psu-ds/BloomFilter.h"
-#include "psu-ds/Alias.h"
-#include "psu-util/timer.h"
#include "framework/interface/Record.h"
#include "framework/structure/BufferView.h"
@@ -34,7 +28,8 @@ class MutableBuffer {
friend class BufferView<R>;
public:
MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0)
- : m_lwm(low_watermark), m_hwm(high_watermark), m_cap(capacity), m_head(0), m_tail(0) {
+ : m_lwm(low_watermark), m_hwm(high_watermark), m_cap(capacity), m_head(0), m_tail(0),
+ m_old_head(0), m_head_refcnt(0), m_old_head_refcnt(0) {
/*
* default capacity is twice the high water mark, to account for the worst-case
* memory requirements.
@@ -50,10 +45,8 @@ public:
}
~MutableBuffer() {
- assert(m_refcnt.load() == 0);
-
- if (m_data) free(m_data);
- if (m_tombstone_filter) delete m_tombstone_filter;
+ free(m_data);
+ delete m_tombstone_filter;
}
template <typename R_ = R>
@@ -94,7 +87,11 @@ public:
}
bool is_full() {
- return (m_tail % m_cap) >= m_hwm;
+ return get_record_count() >= m_hwm;
+ }
+
+ bool is_at_low_watermark() {
+ return (m_tail % m_cap) > m_lwm;
}
size_t get_tombstone_count() {
@@ -125,66 +122,132 @@ public:
}
size_t get_memory_usage() {
- return m_cap * sizeof(R);
+ return m_cap * sizeof(Wrapped<R>);
}
size_t get_aux_memory_usage() {
return m_tombstone_filter->get_memory_usage();
}
- size_t get_tombstone_capacity() {
- // FIXME: tombstone capacity needs figured out again
- return m_cap;
+ BufferView<R> get_buffer_view() {
+ m_head_refcnt.fetch_add(1);
+ return BufferView(m_data, m_head, m_tail.load(), m_tombstone_filter, (void*) this, release_head_reference);
}
/*
- * Concurrency-related operations
+ * Advance the buffer following a reconstruction. Move current
+ * head and head_refcnt into old_head and old_head_refcnt, then
+ * assign new_head to old_head.
*/
- bool take_reference() {
- m_refcnt.fetch_add(1);
- return true;
+ void advance_head(size_t new_head) {
+ assert(new_head > m_head.load());
+ assert(new_head <= m_tail.load());
+ assert(m_old_head_refcnt == 0);
+
+ /*
+ * the order here is very important. We first store zero to the
+ * old_refcnt (should be zero anyway). Then we move the current
+ * head to old head. At this point, any new buffer views should
+ * increment the old head refcnt, so no new references to the
+ * current head will be taken. Then we add the current head
+ * refcnt to this. This is to ensure that no references get
+ * dropped. Only after this do we change to the new head
+ */
+ m_old_head_refcnt.store(0);
+ m_old_head.store(m_head.load());
+ m_old_head_refcnt.fetch_add(m_head_refcnt);
+
+ m_head_refcnt.store(0);
+ m_head.store(new_head);
}
- bool release_reference() {
- assert(m_refcnt > 0);
- m_refcnt.fetch_add(-1);
- return true;
+ void set_low_watermark(size_t lwm) {
+ assert(lwm < m_hwm);
+ m_lwm = lwm;
}
- size_t get_reference_count() {
- return m_refcnt.load();
+ size_t get_low_watermark() {
+ return m_lwm;
+ }
+
+ void set_high_watermark(size_t hwm) {
+ assert(hwm > m_lwm);
+ assert(hwm < m_cap);
+ m_hwm = hwm;
+ }
+
+ size_t get_high_watermark() {
+ return m_hwm;
+ }
+
+ size_t get_tail() {
+ return m_tail.load();
+ }
+
+ /*
+ * Note: this returns the available physical storage capacity,
+ * *not* now many more records can be inserted before the
+ * HWM is reached.
+ */
+ size_t get_available_capacity() {
+ return m_cap - (m_tail.load() - m_old_head.load());
}
private:
int64_t try_advance_tail() {
- int64_t new_tail = m_tail.fetch_add(1) % m_cap;
+ size_t old_value = m_tail.load();
+
+ /* if full, fail to advance the tail */
+ if (old_value >= m_hwm) {
+ return -1;
+ }
- if (new_tail < m_hwm) {
- return new_tail;
+ while (!m_tail.compare_exchange_strong(old_value, old_value+1)) {
+ /* if full, stop trying and fail to advance the tail */
+ if (m_tail.load() >= m_hwm) {
+ return -1;
+ }
}
- m_tail.fetch_add(-1);
- return -1;
+ return old_value;
}
size_t to_idx(size_t i) {
return (m_head + i) % m_cap;
}
- size_t m_cap;
+ static void release_head_reference(void *buff, size_t head) {
+ MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff;
+
+ if (head == buffer->m_head.load()) {
+ buffer->m_head_refcnt.fetch_sub(1);
+ } else if (head == buffer->m_old_head.load()) {
+ buffer->m_old_head_refcnt.fetch_sub(1);
+ /*
+ * if the old head refcnt drops to 0, free
+ * the records by setting old_head = head
+ */
+ if (buffer->m_old_head_refcnt.load() == 0) {
+ buffer->m_old_head.store(buffer->m_head);
+ }
+ }
+ }
size_t m_lwm;
size_t m_hwm;
+ size_t m_cap;
alignas(64) std::atomic<size_t> m_tail;
+
alignas(64) std::atomic<size_t> m_head;
+ alignas(64) std::atomic<size_t> m_head_refcnt;
+
+ alignas(64) std::atomic<size_t> m_old_head;
+ alignas(64) std::atomic<size_t> m_old_head_refcnt;
Wrapped<R>* m_data;
-
psudb::BloomFilter<R>* m_tombstone_filter;
-
alignas(64) std::atomic<size_t> m_tombstonecnt;
- alignas(64) std::atomic<size_t> m_refcnt;
};
}