/* * include/framework/structure/MutableBuffer.h * * Copyright (C) 2023-2024 Douglas B. Rumbaugh * Dong Xie * * Distributed under the Modified BSD License. * * NOTE: Concerning the tombstone count. One possible approach * would be to track the number of tombstones below and above the * low water mark--this would be straightforward to do. Then, if we * *require* that the head only advance up to the LWM, we can get a * correct view on the number of tombstones in the active buffer at * any point in time, and the BufferView will have a pretty good * approximation as well (potentially with a few extra if new inserts * happen between when the tail pointer and tombstone count are fetched) * */ #pragma once #include #include #include #include #include "framework/interface/Record.h" #include "framework/structure/BufferView.h" #include "psu-ds/BloomFilter.h" #include "psu-util/alignment.h" #include "util/bf_config.h" namespace de { template class MutableBuffer { friend class BufferView; struct buffer_head { size_t head_idx; size_t refcnt; }; public: MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity = 0) : m_lwm(low_watermark), m_hwm(high_watermark), m_cap((capacity == 0) ? 2 * high_watermark : capacity), m_tail(0), m_head({0, 0}), m_old_head({high_watermark, 0}), m_data(new Wrapped[m_cap]()), m_tombstone_filter( new psudb::BloomFilter(BF_FPR, m_hwm, BF_HASH_FUNCS)), m_tscnt(0), m_old_tscnt(0), m_active_head_advance(false) { assert(m_cap > m_hwm); assert(m_hwm >= m_lwm); } ~MutableBuffer() { delete[] m_data; delete m_tombstone_filter; } int append(const R &rec, bool tombstone = false) { int32_t tail = 0; if ((tail = try_advance_tail()) == -1) { return 0; } Wrapped wrec; wrec.rec = rec; wrec.header = 0; if (tombstone) wrec.set_tombstone(); // FIXME: because of the mod, it isn't correct to use `pos` // as the ordering timestamp in the header anymore. size_t pos = tail % m_cap; m_data[pos] = wrec; m_data[pos].set_timestamp(pos); if (tombstone) { m_tscnt.fetch_add(1); if (m_tombstone_filter) m_tombstone_filter->insert(rec); } m_data[pos].set_visible(); return 1; } bool truncate() { m_tscnt.store(0); m_tail.store(0); if (m_tombstone_filter) m_tombstone_filter->clear(); return true; } size_t get_record_count() const { return m_tail.load() - m_head.load().head_idx; } size_t get_capacity() const { return m_cap; } bool is_full() const { return get_record_count() >= m_hwm; } bool is_at_low_watermark() const { return get_record_count() >= m_lwm; } bool is_at_high_watermark() const { return get_record_count() >= m_hwm; } size_t get_tombstone_count() const { return m_tscnt.load(); } bool delete_record(const R &rec) { return get_buffer_view().delete_record(rec); } bool check_tombstone(const R &rec) { return get_buffer_view().check_tombstone(rec); } size_t get_memory_usage() const { return m_cap * sizeof(Wrapped); } size_t get_aux_memory_usage() const { return m_tombstone_filter->get_memory_usage(); } BufferView get_buffer_view(size_t target_head) { size_t head = get_head(target_head); auto f = std::bind(release_head_reference, (void *)this, head); return BufferView(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); } BufferView get_buffer_view() { size_t head = get_head(m_head.load().head_idx); auto f = std::bind(release_head_reference, (void *)this, head); return BufferView(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); } /* * Advance the buffer following a reconstruction. Move current * head and head_refcnt into old_head and old_head_refcnt, then * assign new_head to old_head. */ bool advance_head(size_t new_head) { assert(new_head > m_head.load().head_idx); assert(new_head <= m_tail.load()); m_active_head_advance.store(true); if (m_old_head.load().refcnt > 0) { //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts [2]\n"); m_active_head_advance.store(false); return false; } // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n", m_old_head.load().head_idx, m_head.load().head_idx, new_head); // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, m_head.load().refcnt); buffer_head new_hd = {new_head, 0}; buffer_head cur_hd; /* replace current head with new head */ do { cur_hd = m_head.load(); } while (!m_head.compare_exchange_strong(cur_hd, new_hd)); /* move the current head into the old head */ m_old_head.store(cur_hd); m_active_head_advance.store(false); return true; } /* * FIXME: If target_head does not match *either* the old_head or the * current_head, this routine will loop infinitely. */ size_t get_head(size_t target_head) { buffer_head cur_hd, new_hd; bool head_acquired = false; //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx); do { if (m_old_head.load().head_idx == target_head) { cur_hd = m_old_head.load(); cur_hd.head_idx = target_head; new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); } else if (m_head.load().head_idx == target_head) { cur_hd = m_head.load(); cur_hd.head_idx = target_head; new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); } } while (!head_acquired); return new_hd.head_idx; } void set_low_watermark(size_t lwm) { assert(lwm < m_hwm); m_lwm = lwm; } size_t get_low_watermark() const { return m_lwm; } void set_high_watermark(size_t hwm) { assert(hwm > m_lwm); assert(hwm < m_cap); m_hwm = hwm; } size_t get_high_watermark() const { return m_hwm; } size_t get_tail() const { return m_tail.load(); } /* * Note: this returns the available physical storage capacity, * *not* now many more records can be inserted before the * HWM is reached. It considers the old_head to be "free" * when it has no remaining references. This should be true, * but a buggy framework implementation may violate the * assumption. */ size_t get_available_capacity() const { if (m_old_head.load().refcnt == 0) { return m_cap - (m_tail.load() - m_head.load().head_idx); } return m_cap - (m_tail.load() - m_old_head.load().head_idx); } private: int64_t try_advance_tail() { size_t old_value = m_tail.load(); /* if full, fail to advance the tail */ if (old_value - m_head.load().head_idx >= m_hwm) { return -1; } while (!m_tail.compare_exchange_strong(old_value, old_value + 1)) { /* if full, stop trying and fail to advance the tail */ if (m_tail.load() >= m_hwm) { return -1; } _mm_pause(); } return old_value; } size_t to_idx(size_t i, size_t head) { return (head + i) % m_cap; } static void release_head_reference(void *buff, size_t head) { MutableBuffer *buffer = (MutableBuffer *)buff; buffer_head cur_hd, new_hd; do { if (buffer->m_old_head.load().head_idx == head) { cur_hd = buffer->m_old_head; if (cur_hd.refcnt == 0) continue; new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { break; } } else { cur_hd = buffer->m_head; if (cur_hd.refcnt == 0) continue; new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { break; } } _mm_pause(); } while (true); } size_t m_lwm; size_t m_hwm; size_t m_cap; alignas(64) std::atomic m_tail; alignas(64) std::atomic m_head; alignas(64) std::atomic m_old_head; Wrapped *m_data; psudb::BloomFilter *m_tombstone_filter; alignas(64) std::atomic m_tscnt; size_t m_old_tscnt; alignas(64) std::atomic m_active_head_advance; }; } // namespace de