summaryrefslogtreecommitdiffstats
path: root/include/framework/structure/MutableBuffer.h
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <dbr4@psu.edu>2024-02-09 14:06:59 -0500
committerGitHub <noreply@github.com>2024-02-09 14:06:59 -0500
commitbc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30 (patch)
tree66333c55feb0ea8875a50e6dc07c8535d241bf1c /include/framework/structure/MutableBuffer.h
parent076e104b8672924c3d80cd1da2fdb5ebee1766ac (diff)
parent46885246313358a3b606eca139b20280e96db10e (diff)
downloaddynamic-extension-bc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30.tar.gz
Merge pull request #1 from dbrumbaugh/new-buffer
Initial Concurrency Implementation
Diffstat (limited to 'include/framework/structure/MutableBuffer.h')
-rw-r--r--include/framework/structure/MutableBuffer.h313
1 files changed, 313 insertions, 0 deletions
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
new file mode 100644
index 0000000..415c95a
--- /dev/null
+++ b/include/framework/structure/MutableBuffer.h
@@ -0,0 +1,313 @@
+/*
+ * include/framework/structure/MutableBuffer.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * NOTE: Concerning the tombstone count. One possible approach
+ * would be to track the number of tombstones below and above the
+ * low water mark--this would be straightforward to do. Then, if we
+ * *require* that the head only advance up to the LWM, we can get a
+ * correct view on the number of tombstones in the active buffer at
+ * any point in time, and the BufferView will have a pretty good
+ * approximation as well (potentially with a few extra if new inserts
+ * happen between when the tail pointer and tombstone count are fetched)
+ *
+ */
+#pragma once
+
+#include <cstdlib>
+#include <atomic>
+#include <cassert>
+#include <immintrin.h>
+
+#include "psu-util/alignment.h"
+#include "util/bf_config.h"
+#include "psu-ds/BloomFilter.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/BufferView.h"
+
+using psudb::CACHELINE_SIZE;
+
+namespace de {
+
+template <RecordInterface R>
+class MutableBuffer {
+ friend class BufferView<R>;
+
+ struct buffer_head {
+ size_t head_idx;
+ size_t refcnt;
+ };
+
+public:
+ MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0)
+ : m_lwm(low_watermark)
+ , m_hwm(high_watermark)
+ , m_cap((capacity == 0) ? 2 * high_watermark : capacity)
+ , m_tail(0)
+ , m_head({0, 0})
+ , m_old_head({high_watermark, 0})
+ , m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)))
+ , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS))
+ , m_tscnt(0)
+ , m_old_tscnt(0)
+ , m_active_head_advance(false)
+ {
+ assert(m_cap > m_hwm);
+ assert(m_hwm > m_lwm);
+ }
+
+ ~MutableBuffer() {
+ free(m_data);
+ delete m_tombstone_filter;
+ }
+
+ int append(const R &rec, bool tombstone=false) {
+ int32_t tail = 0;
+ if ((tail = try_advance_tail()) == -1) {
+ return 0;
+ }
+
+ Wrapped<R> wrec;
+ wrec.rec = rec;
+ wrec.header = 0;
+ if (tombstone) wrec.set_tombstone();
+
+ size_t pos = tail % m_cap;
+
+ m_data[pos] = wrec;
+ m_data[pos].header |= (pos << 2);
+
+ if (tombstone) {
+ m_tscnt.fetch_add(1);
+ if (m_tombstone_filter) m_tombstone_filter->insert(rec);
+ }
+
+ return 1;
+ }
+
+ bool truncate() {
+ m_tscnt.store(0);
+ m_tail.store(0);
+ if (m_tombstone_filter) m_tombstone_filter->clear();
+
+ return true;
+ }
+
+ size_t get_record_count() {
+ return m_tail.load() - m_head.load().head_idx;
+ }
+
+ size_t get_capacity() {
+ return m_cap;
+ }
+
+ bool is_full() {
+ return get_record_count() >= m_hwm;
+ }
+
+ bool is_at_low_watermark() {
+ return get_record_count() >= m_lwm;
+ }
+
+ size_t get_tombstone_count() {
+ return m_tscnt.load();
+ }
+
+ bool delete_record(const R& rec) {
+ return get_buffer_view().delete_record(rec);
+ }
+
+ bool check_tombstone(const R& rec) {
+ return get_buffer_view().check_tombstone(rec);
+ }
+
+ size_t get_memory_usage() {
+ return m_cap * sizeof(Wrapped<R>);
+ }
+
+ size_t get_aux_memory_usage() {
+ return m_tombstone_filter->get_memory_usage();
+ }
+
+ BufferView<R> get_buffer_view(size_t target_head) {
+ size_t head = get_head(target_head);
+ auto f = std::bind(release_head_reference, (void *) this, head);
+
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
+ }
+
+ BufferView<R> get_buffer_view() {
+ size_t head = get_head(m_head.load().head_idx);
+ auto f = std::bind(release_head_reference, (void *) this, head);
+
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
+ }
+
+ /*
+ * Advance the buffer following a reconstruction. Move current
+ * head and head_refcnt into old_head and old_head_refcnt, then
+ * assign new_head to old_head.
+ */
+ bool advance_head(size_t new_head) {
+ assert(new_head > m_head.load().head_idx);
+ assert(new_head <= m_tail.load());
+
+ /* refuse to advance head while there is an old with one references */
+ if (m_old_head.load().refcnt > 0) {
+ //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n");
+ return false;
+ }
+
+ m_active_head_advance.store(true);
+
+ buffer_head new_hd = {new_head, 0};
+ buffer_head cur_hd;
+
+ /* replace current head with new head */
+ do {
+ cur_hd = m_head.load();
+ } while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+
+ /* move the current head into the old head */
+ m_old_head.store(cur_hd);
+
+ m_active_head_advance.store(false);
+ return true;
+ }
+
+ /*
+ * FIXME: If target_head does not match *either* the old_head or the
+ * current_head, this routine will loop infinitely.
+ */
+ size_t get_head(size_t target_head) {
+ buffer_head cur_hd, new_hd;
+ bool head_acquired = false;
+
+ do {
+ if (m_old_head.load().head_idx == target_head) {
+ cur_hd = m_old_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else if (m_head.load().head_idx == target_head){
+ cur_hd = m_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ } while(!head_acquired);
+
+ return new_hd.head_idx;
+ }
+
+ void set_low_watermark(size_t lwm) {
+ assert(lwm < m_hwm);
+ m_lwm = lwm;
+ }
+
+ size_t get_low_watermark() {
+ return m_lwm;
+ }
+
+ void set_high_watermark(size_t hwm) {
+ assert(hwm > m_lwm);
+ assert(hwm < m_cap);
+ m_hwm = hwm;
+ }
+
+ size_t get_high_watermark() {
+ return m_hwm;
+ }
+
+ size_t get_tail() {
+ return m_tail.load();
+ }
+
+ /*
+ * Note: this returns the available physical storage capacity,
+ * *not* now many more records can be inserted before the
+ * HWM is reached. It considers the old_head to be "free"
+ * when it has no remaining references. This should be true,
+ * but a buggy framework implementation may violate the
+ * assumption.
+ */
+ size_t get_available_capacity() {
+ if (m_old_head.load().refcnt == 0) {
+ return m_cap - (m_tail.load() - m_head.load().head_idx);
+ }
+
+ return m_cap - (m_tail.load() - m_old_head.load().head_idx);
+ }
+
+private:
+ int64_t try_advance_tail() {
+ size_t old_value = m_tail.load();
+
+ /* if full, fail to advance the tail */
+ if (old_value - m_head.load().head_idx >= m_hwm) {
+ return -1;
+ }
+
+ while (!m_tail.compare_exchange_strong(old_value, old_value+1)) {
+ /* if full, stop trying and fail to advance the tail */
+ if (m_tail.load() >= m_hwm) {
+ return -1;
+ }
+
+ _mm_pause();
+ }
+
+ return old_value;
+ }
+
+ size_t to_idx(size_t i, size_t head) {
+ return (head + i) % m_cap;
+ }
+
+ static void release_head_reference(void *buff, size_t head) {
+ MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff;
+
+ buffer_head cur_hd, new_hd;
+ do {
+ if (buffer->m_old_head.load().head_idx == head) {
+ cur_hd = buffer->m_old_head;
+ if (cur_hd.refcnt == 0) continue;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
+ if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
+ break;
+ }
+ } else {
+ cur_hd = buffer->m_head;
+ if (cur_hd.refcnt == 0) continue;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
+
+ if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
+ break;
+ }
+ }
+ _mm_pause();
+ } while(true);
+ }
+
+ size_t m_lwm;
+ size_t m_hwm;
+ size_t m_cap;
+
+ alignas(64) std::atomic<size_t> m_tail;
+
+ alignas(64) std::atomic<buffer_head> m_head;
+ alignas(64) std::atomic<buffer_head> m_old_head;
+
+ Wrapped<R>* m_data;
+ psudb::BloomFilter<R>* m_tombstone_filter;
+ alignas(64) std::atomic<size_t> m_tscnt;
+ size_t m_old_tscnt;
+
+ alignas(64) std::atomic<bool> m_active_head_advance;
+};
+
+}