diff options
Diffstat (limited to 'include/framework/structure')
| -rw-r--r-- | include/framework/structure/BufferView.h | 17 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 160 |
2 files changed, 93 insertions, 84 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index acf1201..a9fb12d 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -1,7 +1,7 @@ /* * include/framework/structure/BufferView.h * - * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu> * * Distributed under the Modified BSD License. * @@ -20,8 +20,6 @@ namespace de { -typedef std::function<void(void)> ReleaseFunction; - template <RecordInterface R> class BufferView { public: BufferView() = default; @@ -35,7 +33,6 @@ public: BufferView(BufferView &&other) : m_data(std::exchange(other.m_data, nullptr)), - m_release(std::move(other.m_release)), m_head(std::exchange(other.m_head, 0)), m_tail(std::exchange(other.m_tail, 0)), m_start(std::exchange(other.m_start, 0)), @@ -48,18 +45,13 @@ public: BufferView &operator=(BufferView &&other) = delete; BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail, - size_t tombstone_cnt, psudb::BloomFilter<R> *filter, - ReleaseFunction release) - : m_data(buffer), m_release(release), m_head(head), m_tail(tail), + size_t tombstone_cnt, psudb::BloomFilter<R> *filter) + : m_data(buffer), m_head(head), m_tail(tail), m_start(m_head % cap), m_stop(m_tail % cap), m_cap(cap), m_approx_ts_cnt(tombstone_cnt), m_tombstone_filter(filter), m_active(true) {} - ~BufferView() { - if (m_active) { - m_release(); - } - } + ~BufferView() = default; bool check_tombstone(const R &rec) { if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) @@ -138,7 +130,6 @@ public: private: Wrapped<R> *m_data; - ReleaseFunction m_release; size_t m_head; size_t m_tail; size_t m_start; diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index e62a495..7357915 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -1,7 +1,7 @@ /* * include/framework/structure/MutableBuffer.h * - * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu> * Dong Xie <dongx@psu.edu> * * Distributed under the Modified BSD License. @@ -109,11 +109,11 @@ public: size_t get_tombstone_count() const { return m_tscnt.load(); } bool delete_record(const R &rec) { - return get_buffer_view().delete_record(rec); + return get_buffer_view(m_head.load().head_idx).delete_record(rec); } bool check_tombstone(const R &rec) { - return get_buffer_view().check_tombstone(rec); + return get_buffer_view(m_head.load().head_idx).check_tombstone(rec); } size_t get_memory_usage() const { return m_cap * sizeof(Wrapped<R>); } @@ -122,20 +122,9 @@ public: return m_tombstone_filter->get_memory_usage(); } - BufferView<R> get_buffer_view(size_t target_head) { - size_t head = get_head(target_head); - auto f = std::bind(release_head_reference, (void *)this, head); - + BufferView<R> get_buffer_view(size_t head) { return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), - m_tombstone_filter, f); - } - - BufferView<R> get_buffer_view() { - size_t head = get_head(m_head.load().head_idx); - auto f = std::bind(release_head_reference, (void *)this, head); - - return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), - m_tombstone_filter, f); + m_tombstone_filter); } /* @@ -159,7 +148,7 @@ public: // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, m_head.load().refcnt); - buffer_head new_hd = {new_head, 0}; + buffer_head new_hd = {new_head, 1}; buffer_head cur_hd; /* replace current head with new head */ @@ -174,32 +163,6 @@ public: return true; } - /* - * FIXME: If target_head does not match *either* the old_head or the - * current_head, this routine will loop infinitely. - */ - size_t get_head(size_t target_head) { - buffer_head cur_hd, new_hd; - bool head_acquired = false; - - - //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx); - do { - if (m_old_head.load().head_idx == target_head) { - cur_hd = m_old_head.load(); - cur_hd.head_idx = target_head; - new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; - head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); - } else if (m_head.load().head_idx == target_head) { - cur_hd = m_head.load(); - cur_hd.head_idx = target_head; - new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; - head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); - } - } while (!head_acquired); - - return new_hd.head_idx; - } void set_low_watermark(size_t lwm) { assert(lwm < m_hwm); @@ -234,8 +197,90 @@ public: return m_cap - (m_tail.load() - m_old_head.load().head_idx); } + size_t debug_get_old_head() const { + return m_old_head.load().head_idx; + } + + size_t debug_get_head() const { + return m_head.load().head_idx; + } + + bool take_head_reference(size_t target_head) { + buffer_head cur_hd, new_hd; + bool head_acquired = false; + + do { + if (m_old_head.load().head_idx == target_head) { + cur_hd = m_old_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else if (m_head.load().head_idx == target_head) { + cur_hd = m_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } while (!head_acquired); + + return head_acquired; + } + + bool release_head_reference(size_t head) { + buffer_head cur_hd, new_hd; + bool head_released = false; + do { + if (m_old_head.load().head_idx == head) { + cur_hd = m_old_head; + + assert(cur_hd.refcnt > 0); + new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; + head_released = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else { + cur_hd = m_head; + + /* it's possible the head got pushed from current to old */ + if (cur_hd.head_idx == head) { + assert(cur_hd.refcnt > 0); + new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; + head_released = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } + _mm_pause(); + } while (!head_released); + + return head_released; + } + private: - int64_t try_advance_tail() { + /* + * FIXME: If target_head does not match *either* the old_head or the + * current_head, this routine will loop infinitely. + */ + size_t get_head(size_t target_head) { + buffer_head cur_hd, new_hd; + bool head_acquired = false; + + + //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx); + do { + if (m_old_head.load().head_idx == target_head) { + cur_hd = m_old_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else if (m_head.load().head_idx == target_head) { + cur_hd = m_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } while (!head_acquired); + + return new_hd.head_idx; + } + + ssize_t try_advance_tail() { size_t old_value = m_tail.load(); /* if full, fail to advance the tail */ @@ -257,33 +302,6 @@ private: size_t to_idx(size_t i, size_t head) { return (head + i) % m_cap; } - static void release_head_reference(void *buff, size_t head) { - MutableBuffer<R> *buffer = (MutableBuffer<R> *)buff; - - buffer_head cur_hd, new_hd; - do { - if (buffer->m_old_head.load().head_idx == head) { - cur_hd = buffer->m_old_head; - if (cur_hd.refcnt == 0) - continue; - new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; - if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { - break; - } - } else { - cur_hd = buffer->m_head; - if (cur_hd.refcnt == 0) - continue; - new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; - - if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { - break; - } - } - _mm_pause(); - } while (true); - } - size_t m_lwm; size_t m_hwm; size_t m_cap; |