summaryrefslogtreecommitdiffstats
path: root/include/framework/structure/MutableBuffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/structure/MutableBuffer.h')
-rw-r--r--include/framework/structure/MutableBuffer.h160
1 files changed, 89 insertions, 71 deletions
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index e62a495..7357915 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -1,7 +1,7 @@
/*
* include/framework/structure/MutableBuffer.h
*
- * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
@@ -109,11 +109,11 @@ public:
size_t get_tombstone_count() const { return m_tscnt.load(); }
bool delete_record(const R &rec) {
- return get_buffer_view().delete_record(rec);
+ return get_buffer_view(m_head.load().head_idx).delete_record(rec);
}
bool check_tombstone(const R &rec) {
- return get_buffer_view().check_tombstone(rec);
+ return get_buffer_view(m_head.load().head_idx).check_tombstone(rec);
}
size_t get_memory_usage() const { return m_cap * sizeof(Wrapped<R>); }
@@ -122,20 +122,9 @@ public:
return m_tombstone_filter->get_memory_usage();
}
- BufferView<R> get_buffer_view(size_t target_head) {
- size_t head = get_head(target_head);
- auto f = std::bind(release_head_reference, (void *)this, head);
-
+ BufferView<R> get_buffer_view(size_t head) {
return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(),
- m_tombstone_filter, f);
- }
-
- BufferView<R> get_buffer_view() {
- size_t head = get_head(m_head.load().head_idx);
- auto f = std::bind(release_head_reference, (void *)this, head);
-
- return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(),
- m_tombstone_filter, f);
+ m_tombstone_filter);
}
/*
@@ -159,7 +148,7 @@ public:
// fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, m_head.load().refcnt);
- buffer_head new_hd = {new_head, 0};
+ buffer_head new_hd = {new_head, 1};
buffer_head cur_hd;
/* replace current head with new head */
@@ -174,32 +163,6 @@ public:
return true;
}
- /*
- * FIXME: If target_head does not match *either* the old_head or the
- * current_head, this routine will loop infinitely.
- */
- size_t get_head(size_t target_head) {
- buffer_head cur_hd, new_hd;
- bool head_acquired = false;
-
-
- //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx);
- do {
- if (m_old_head.load().head_idx == target_head) {
- cur_hd = m_old_head.load();
- cur_hd.head_idx = target_head;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
- head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
- } else if (m_head.load().head_idx == target_head) {
- cur_hd = m_head.load();
- cur_hd.head_idx = target_head;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
- head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
- }
- } while (!head_acquired);
-
- return new_hd.head_idx;
- }
void set_low_watermark(size_t lwm) {
assert(lwm < m_hwm);
@@ -234,8 +197,90 @@ public:
return m_cap - (m_tail.load() - m_old_head.load().head_idx);
}
+ size_t debug_get_old_head() const {
+ return m_old_head.load().head_idx;
+ }
+
+ size_t debug_get_head() const {
+ return m_head.load().head_idx;
+ }
+
+ bool take_head_reference(size_t target_head) {
+ buffer_head cur_hd, new_hd;
+ bool head_acquired = false;
+
+ do {
+ if (m_old_head.load().head_idx == target_head) {
+ cur_hd = m_old_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else if (m_head.load().head_idx == target_head) {
+ cur_hd = m_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ } while (!head_acquired);
+
+ return head_acquired;
+ }
+
+ bool release_head_reference(size_t head) {
+ buffer_head cur_hd, new_hd;
+ bool head_released = false;
+ do {
+ if (m_old_head.load().head_idx == head) {
+ cur_hd = m_old_head;
+
+ assert(cur_hd.refcnt > 0);
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1};
+ head_released = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else {
+ cur_hd = m_head;
+
+ /* it's possible the head got pushed from current to old */
+ if (cur_hd.head_idx == head) {
+ assert(cur_hd.refcnt > 0);
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1};
+ head_released = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ }
+ _mm_pause();
+ } while (!head_released);
+
+ return head_released;
+ }
+
private:
- int64_t try_advance_tail() {
+ /*
+ * FIXME: If target_head does not match *either* the old_head or the
+ * current_head, this routine will loop infinitely.
+ */
+ size_t get_head(size_t target_head) {
+ buffer_head cur_hd, new_hd;
+ bool head_acquired = false;
+
+
+ //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx);
+ do {
+ if (m_old_head.load().head_idx == target_head) {
+ cur_hd = m_old_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else if (m_head.load().head_idx == target_head) {
+ cur_hd = m_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ } while (!head_acquired);
+
+ return new_hd.head_idx;
+ }
+
+ ssize_t try_advance_tail() {
size_t old_value = m_tail.load();
/* if full, fail to advance the tail */
@@ -257,33 +302,6 @@ private:
size_t to_idx(size_t i, size_t head) { return (head + i) % m_cap; }
- static void release_head_reference(void *buff, size_t head) {
- MutableBuffer<R> *buffer = (MutableBuffer<R> *)buff;
-
- buffer_head cur_hd, new_hd;
- do {
- if (buffer->m_old_head.load().head_idx == head) {
- cur_hd = buffer->m_old_head;
- if (cur_hd.refcnt == 0)
- continue;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1};
- if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
- break;
- }
- } else {
- cur_hd = buffer->m_head;
- if (cur_hd.refcnt == 0)
- continue;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1};
-
- if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
- break;
- }
- }
- _mm_pause();
- } while (true);
- }
-
size_t m_lwm;
size_t m_hwm;
size_t m_cap;