summaryrefslogtreecommitdiffstats
path: root/include/framework/structure/MutableBuffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/structure/MutableBuffer.h')
-rw-r--r--include/framework/structure/MutableBuffer.h464
1 files changed, 222 insertions, 242 deletions
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 7db3980..625b04b 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -1,8 +1,8 @@
/*
* include/framework/structure/MutableBuffer.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -18,301 +18,281 @@
*/
#pragma once
-#include <cstdlib>
#include <atomic>
#include <cassert>
+#include <cstdlib>
#include <immintrin.h>
-#include "psu-util/alignment.h"
-#include "util/bf_config.h"
-#include "psu-ds/BloomFilter.h"
#include "framework/interface/Record.h"
#include "framework/structure/BufferView.h"
-
-using psudb::CACHELINE_SIZE;
+#include "psu-ds/BloomFilter.h"
+#include "psu-util/alignment.h"
+#include "util/bf_config.h"
namespace de {
-template <RecordInterface R>
-class MutableBuffer {
- friend class BufferView<R>;
+template <RecordInterface R> class MutableBuffer {
+ friend class BufferView<R>;
- struct buffer_head {
- size_t head_idx;
- size_t refcnt;
- };
-
-public:
- MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0)
- : m_lwm(low_watermark)
- , m_hwm(high_watermark)
- , m_cap((capacity == 0) ? 2 * high_watermark : capacity)
- , m_tail(0)
- , m_head({0, 0})
- , m_old_head({high_watermark, 0})
- //, m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)))
- , m_data(new Wrapped<R>[m_cap]())
- , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS))
- , m_tscnt(0)
- , m_old_tscnt(0)
- , m_active_head_advance(false)
- {
- assert(m_cap > m_hwm);
- assert(m_hwm >= m_lwm);
- }
+ struct buffer_head {
+ size_t head_idx;
+ size_t refcnt;
+ };
- ~MutableBuffer() {
- delete[] m_data;
- delete m_tombstone_filter;
+public:
+ MutableBuffer(size_t low_watermark, size_t high_watermark,
+ size_t capacity = 0)
+ : m_lwm(low_watermark), m_hwm(high_watermark),
+ m_cap((capacity == 0) ? 2 * high_watermark : capacity), m_tail(0),
+ m_head({0, 0}), m_old_head({high_watermark, 0}),
+ m_data(new Wrapped<R>[m_cap]()),
+ m_tombstone_filter(
+ new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS)),
+ m_tscnt(0), m_old_tscnt(0), m_active_head_advance(false) {
+ assert(m_cap > m_hwm);
+ assert(m_hwm >= m_lwm);
+ }
+
+ ~MutableBuffer() {
+ delete[] m_data;
+ delete m_tombstone_filter;
+ }
+
+ int append(const R &rec, bool tombstone = false) {
+ int32_t tail = 0;
+ if ((tail = try_advance_tail()) == -1) {
+ return 0;
}
- int append(const R &rec, bool tombstone=false) {
- int32_t tail = 0;
- if ((tail = try_advance_tail()) == -1) {
- return 0;
- }
-
- Wrapped<R> wrec;
- wrec.rec = rec;
- wrec.header = 0;
- if (tombstone) wrec.set_tombstone();
+ Wrapped<R> wrec;
+ wrec.rec = rec;
+ wrec.header = 0;
+ if (tombstone)
+ wrec.set_tombstone();
- // FIXME: because of the mod, it isn't correct to use `pos`
- // as the ordering timestamp in the header anymore.
- size_t pos = tail % m_cap;
-
- m_data[pos] = wrec;
- m_data[pos].set_timestamp(pos);
-
- if (tombstone) {
- m_tscnt.fetch_add(1);
- if (m_tombstone_filter) m_tombstone_filter->insert(rec);
- }
+ // FIXME: because of the mod, it isn't correct to use `pos`
+ // as the ordering timestamp in the header anymore.
+ size_t pos = tail % m_cap;
- m_data[pos].set_visible();
+ m_data[pos] = wrec;
+ m_data[pos].set_timestamp(pos);
- return 1;
+ if (tombstone) {
+ m_tscnt.fetch_add(1);
+ if (m_tombstone_filter)
+ m_tombstone_filter->insert(rec);
}
- bool truncate() {
- m_tscnt.store(0);
- m_tail.store(0);
- if (m_tombstone_filter) m_tombstone_filter->clear();
+ m_data[pos].set_visible();
- return true;
- }
+ return 1;
+ }
- size_t get_record_count() {
- return m_tail.load() - m_head.load().head_idx;
- }
-
- size_t get_capacity() {
- return m_cap;
- }
+ bool truncate() {
+ m_tscnt.store(0);
+ m_tail.store(0);
+ if (m_tombstone_filter)
+ m_tombstone_filter->clear();
- bool is_full() {
- return get_record_count() >= m_hwm;
- }
+ return true;
+ }
- bool is_at_low_watermark() {
- return get_record_count() >= m_lwm;
- }
+ size_t get_record_count() { return m_tail.load() - m_head.load().head_idx; }
- size_t get_tombstone_count() {
- return m_tscnt.load();
- }
+ size_t get_capacity() { return m_cap; }
- bool delete_record(const R& rec) {
- return get_buffer_view().delete_record(rec);
- }
+ bool is_full() { return get_record_count() >= m_hwm; }
- bool check_tombstone(const R& rec) {
- return get_buffer_view().check_tombstone(rec);
- }
+ bool is_at_low_watermark() { return get_record_count() >= m_lwm; }
- size_t get_memory_usage() {
- return m_cap * sizeof(Wrapped<R>);
- }
+ size_t get_tombstone_count() { return m_tscnt.load(); }
- size_t get_aux_memory_usage() {
- return m_tombstone_filter->get_memory_usage();
- }
+ bool delete_record(const R &rec) {
+ return get_buffer_view().delete_record(rec);
+ }
- BufferView<R> get_buffer_view(size_t target_head) {
- size_t head = get_head(target_head);
- auto f = std::bind(release_head_reference, (void *) this, head);
+ bool check_tombstone(const R &rec) {
+ return get_buffer_view().check_tombstone(rec);
+ }
- return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
- }
+ size_t get_memory_usage() { return m_cap * sizeof(Wrapped<R>); }
- BufferView<R> get_buffer_view() {
- size_t head = get_head(m_head.load().head_idx);
- auto f = std::bind(release_head_reference, (void *) this, head);
+ size_t get_aux_memory_usage() {
+ return m_tombstone_filter->get_memory_usage();
+ }
- return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
- }
+ BufferView<R> get_buffer_view(size_t target_head) {
+ size_t head = get_head(target_head);
+ auto f = std::bind(release_head_reference, (void *)this, head);
- /*
- * Advance the buffer following a reconstruction. Move current
- * head and head_refcnt into old_head and old_head_refcnt, then
- * assign new_head to old_head.
- */
- bool advance_head(size_t new_head) {
- assert(new_head > m_head.load().head_idx);
- assert(new_head <= m_tail.load());
-
- /* refuse to advance head while there is an old with one references */
- if (m_old_head.load().refcnt > 0) {
- //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n");
- return false;
- }
-
- m_active_head_advance.store(true);
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(),
+ m_tombstone_filter, f);
+ }
- buffer_head new_hd = {new_head, 0};
- buffer_head cur_hd;
+ BufferView<R> get_buffer_view() {
+ size_t head = get_head(m_head.load().head_idx);
+ auto f = std::bind(release_head_reference, (void *)this, head);
- /* replace current head with new head */
- do {
- cur_hd = m_head.load();
- } while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(),
+ m_tombstone_filter, f);
+ }
- /* move the current head into the old head */
- m_old_head.store(cur_hd);
+ /*
+ * Advance the buffer following a reconstruction. Move current
+ * head and head_refcnt into old_head and old_head_refcnt, then
+ * assign new_head to old_head.
+ */
+ bool advance_head(size_t new_head) {
+ assert(new_head > m_head.load().head_idx);
+ assert(new_head <= m_tail.load());
- m_active_head_advance.store(false);
- return true;
+ /* refuse to advance head while there is an old with one references */
+ if (m_old_head.load().refcnt > 0) {
+ // fprintf(stderr, "[W]: Refusing to advance head due to remaining
+ // reference counts\n");
+ return false;
}
- /*
- * FIXME: If target_head does not match *either* the old_head or the
- * current_head, this routine will loop infinitely.
- */
- size_t get_head(size_t target_head) {
- buffer_head cur_hd, new_hd;
- bool head_acquired = false;
-
- do {
- if (m_old_head.load().head_idx == target_head) {
- cur_hd = m_old_head.load();
- cur_hd.head_idx = target_head;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
- head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
- } else if (m_head.load().head_idx == target_head){
- cur_hd = m_head.load();
- cur_hd.head_idx = target_head;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
- head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
- }
- } while(!head_acquired);
-
- return new_hd.head_idx;
+ m_active_head_advance.store(true);
+
+ buffer_head new_hd = {new_head, 0};
+ buffer_head cur_hd;
+
+ /* replace current head with new head */
+ do {
+ cur_hd = m_head.load();
+ } while (!m_head.compare_exchange_strong(cur_hd, new_hd));
+
+ /* move the current head into the old head */
+ m_old_head.store(cur_hd);
+
+ m_active_head_advance.store(false);
+ return true;
+ }
+
+ /*
+ * FIXME: If target_head does not match *either* the old_head or the
+ * current_head, this routine will loop infinitely.
+ */
+ size_t get_head(size_t target_head) {
+ buffer_head cur_hd, new_hd;
+ bool head_acquired = false;
+
+ do {
+ if (m_old_head.load().head_idx == target_head) {
+ cur_hd = m_old_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else if (m_head.load().head_idx == target_head) {
+ cur_hd = m_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ } while (!head_acquired);
+
+ return new_hd.head_idx;
+ }
+
+ void set_low_watermark(size_t lwm) {
+ assert(lwm < m_hwm);
+ m_lwm = lwm;
+ }
+
+ size_t get_low_watermark() { return m_lwm; }
+
+ void set_high_watermark(size_t hwm) {
+ assert(hwm > m_lwm);
+ assert(hwm < m_cap);
+ m_hwm = hwm;
+ }
+
+ size_t get_high_watermark() { return m_hwm; }
+
+ size_t get_tail() { return m_tail.load(); }
+
+ /*
+ * Note: this returns the available physical storage capacity,
+ * *not* now many more records can be inserted before the
+ * HWM is reached. It considers the old_head to be "free"
+ * when it has no remaining references. This should be true,
+ * but a buggy framework implementation may violate the
+ * assumption.
+ */
+ size_t get_available_capacity() {
+ if (m_old_head.load().refcnt == 0) {
+ return m_cap - (m_tail.load() - m_head.load().head_idx);
}
- void set_low_watermark(size_t lwm) {
- assert(lwm < m_hwm);
- m_lwm = lwm;
- }
+ return m_cap - (m_tail.load() - m_old_head.load().head_idx);
+ }
- size_t get_low_watermark() {
- return m_lwm;
- }
+private:
+ int64_t try_advance_tail() {
+ size_t old_value = m_tail.load();
- void set_high_watermark(size_t hwm) {
- assert(hwm > m_lwm);
- assert(hwm < m_cap);
- m_hwm = hwm;
+ /* if full, fail to advance the tail */
+ if (old_value - m_head.load().head_idx >= m_hwm) {
+ return -1;
}
- size_t get_high_watermark() {
- return m_hwm;
- }
+ while (!m_tail.compare_exchange_strong(old_value, old_value + 1)) {
+ /* if full, stop trying and fail to advance the tail */
+ if (m_tail.load() >= m_hwm) {
+ return -1;
+ }
- size_t get_tail() {
- return m_tail.load();
+ _mm_pause();
}
- /*
- * Note: this returns the available physical storage capacity,
- * *not* now many more records can be inserted before the
- * HWM is reached. It considers the old_head to be "free"
- * when it has no remaining references. This should be true,
- * but a buggy framework implementation may violate the
- * assumption.
- */
- size_t get_available_capacity() {
- if (m_old_head.load().refcnt == 0) {
- return m_cap - (m_tail.load() - m_head.load().head_idx);
- }
+ return old_value;
+ }
- return m_cap - (m_tail.load() - m_old_head.load().head_idx);
- }
+ size_t to_idx(size_t i, size_t head) { return (head + i) % m_cap; }
-private:
- int64_t try_advance_tail() {
- size_t old_value = m_tail.load();
+ static void release_head_reference(void *buff, size_t head) {
+ MutableBuffer<R> *buffer = (MutableBuffer<R> *)buff;
- /* if full, fail to advance the tail */
- if (old_value - m_head.load().head_idx >= m_hwm) {
- return -1;
+ buffer_head cur_hd, new_hd;
+ do {
+ if (buffer->m_old_head.load().head_idx == head) {
+ cur_hd = buffer->m_old_head;
+ if (cur_hd.refcnt == 0)
+ continue;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1};
+ if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
+ break;
}
-
- while (!m_tail.compare_exchange_strong(old_value, old_value+1)) {
- /* if full, stop trying and fail to advance the tail */
- if (m_tail.load() >= m_hwm) {
- return -1;
- }
-
- _mm_pause();
+ } else {
+ cur_hd = buffer->m_head;
+ if (cur_hd.refcnt == 0)
+ continue;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1};
+
+ if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
+ break;
}
+ }
+ _mm_pause();
+ } while (true);
+ }
- return old_value;
- }
+ size_t m_lwm;
+ size_t m_hwm;
+ size_t m_cap;
- size_t to_idx(size_t i, size_t head) {
- return (head + i) % m_cap;
- }
+ alignas(64) std::atomic<size_t> m_tail;
- static void release_head_reference(void *buff, size_t head) {
- MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff;
-
- buffer_head cur_hd, new_hd;
- do {
- if (buffer->m_old_head.load().head_idx == head) {
- cur_hd = buffer->m_old_head;
- if (cur_hd.refcnt == 0) continue;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
- if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
- break;
- }
- } else {
- cur_hd = buffer->m_head;
- if (cur_hd.refcnt == 0) continue;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
-
- if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
- break;
- }
- }
- _mm_pause();
- } while(true);
- }
+ alignas(64) std::atomic<buffer_head> m_head;
+ alignas(64) std::atomic<buffer_head> m_old_head;
+
+ Wrapped<R> *m_data;
+ psudb::BloomFilter<R> *m_tombstone_filter;
+ alignas(64) std::atomic<size_t> m_tscnt;
+ size_t m_old_tscnt;
- size_t m_lwm;
- size_t m_hwm;
- size_t m_cap;
-
- alignas(64) std::atomic<size_t> m_tail;
-
- alignas(64) std::atomic<buffer_head> m_head;
- alignas(64) std::atomic<buffer_head> m_old_head;
-
- Wrapped<R>* m_data;
- psudb::BloomFilter<R>* m_tombstone_filter;
- alignas(64) std::atomic<size_t> m_tscnt;
- size_t m_old_tscnt;
-
- alignas(64) std::atomic<bool> m_active_head_advance;
+ alignas(64) std::atomic<bool> m_active_head_advance;
};
-}
+} // namespace de