summaryrefslogtreecommitdiffstats
path: root/include/framework/structure/MutableBuffer.h
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <dbr4@psu.edu>2024-12-06 13:13:51 -0500
committerGitHub <noreply@github.com>2024-12-06 18:13:51 +0000
commit9fe305c7d28e993e55c55427f377ae7e3251ea4f (patch)
tree384b687f64b84eb81bde2becac8a5f24916b07b4 /include/framework/structure/MutableBuffer.h
parent47916da2ba5ed5bee2dda3cbcc58d39e1e931bfc (diff)
downloaddynamic-extension-9fe305c7d28e993e55c55427f377ae7e3251ea4f.tar.gz
Interface update (#5)
* Query Interface Adjustments/Refactoring Began the process of adjusting the query interface (and also the shard interface, to a lesser degree) to better accommodate the user. In particular the following changes have been made, 1. The number of necessary template arguments for the query type has been drastically reduced, while also removing the void pointers and manual delete functions from the interface. This was accomplished by requiring many of the sub-types associated with a query (parameters, etc.) to be nested inside the main query class, and by forcing the SHARD type to expose its associated record type. 2. User-defined query return types are now supported. Queries no longer are required to return strictly sets of records. Instead, the query now has LocalResultType and ResultType template parameters (which can be defaulted using a typedef in the Query type itself), allowing much more flexibility. Note that, at least for the short term, the LocalResultType must still expose the same is_deleted/is_tombstone interface as a Wrapped<R> used to, as this is currently needed for delete filtering. A better approach to this is, hopefully, forthcoming. 3. Updated the ISAMTree.h shard and rangequery.h query to use the new interfaces, and adjusted the associated unit tests as well. 4. Dropped the unnecessary "get_data()" function from the ShardInterface concept. 5. Dropped the need to specify a record type in the ShardInterface concept. This is now handled using a required Shard::RECORD member of the Shard class itself, which should expose the name of the record type. * Updates to framework to support new Query/Shard interfaces Pretty extensive adjustments to the framework, particularly to the templates themselves, along with some type-renaming work, to support the new query and shard interfaces. Adjusted the external query interface to take an rvalue reference, rather than a pointer, to the query parameters. * Removed framework-level delete filtering This was causing some issues with the new query interface, and should probably be reworked anyway, so I'm temporarily (TM) removing the feature. * Updated benchmarks + remaining code for new interface
Diffstat (limited to 'include/framework/structure/MutableBuffer.h')
-rw-r--r--include/framework/structure/MutableBuffer.h464
1 files changed, 222 insertions, 242 deletions
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 7db3980..625b04b 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -1,8 +1,8 @@
/*
* include/framework/structure/MutableBuffer.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -18,301 +18,281 @@
*/
#pragma once
-#include <cstdlib>
#include <atomic>
#include <cassert>
+#include <cstdlib>
#include <immintrin.h>
-#include "psu-util/alignment.h"
-#include "util/bf_config.h"
-#include "psu-ds/BloomFilter.h"
#include "framework/interface/Record.h"
#include "framework/structure/BufferView.h"
-
-using psudb::CACHELINE_SIZE;
+#include "psu-ds/BloomFilter.h"
+#include "psu-util/alignment.h"
+#include "util/bf_config.h"
namespace de {
-template <RecordInterface R>
-class MutableBuffer {
- friend class BufferView<R>;
+template <RecordInterface R> class MutableBuffer {
+ friend class BufferView<R>;
- struct buffer_head {
- size_t head_idx;
- size_t refcnt;
- };
-
-public:
- MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0)
- : m_lwm(low_watermark)
- , m_hwm(high_watermark)
- , m_cap((capacity == 0) ? 2 * high_watermark : capacity)
- , m_tail(0)
- , m_head({0, 0})
- , m_old_head({high_watermark, 0})
- //, m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)))
- , m_data(new Wrapped<R>[m_cap]())
- , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS))
- , m_tscnt(0)
- , m_old_tscnt(0)
- , m_active_head_advance(false)
- {
- assert(m_cap > m_hwm);
- assert(m_hwm >= m_lwm);
- }
+ struct buffer_head {
+ size_t head_idx;
+ size_t refcnt;
+ };
- ~MutableBuffer() {
- delete[] m_data;
- delete m_tombstone_filter;
+public:
+ MutableBuffer(size_t low_watermark, size_t high_watermark,
+ size_t capacity = 0)
+ : m_lwm(low_watermark), m_hwm(high_watermark),
+ m_cap((capacity == 0) ? 2 * high_watermark : capacity), m_tail(0),
+ m_head({0, 0}), m_old_head({high_watermark, 0}),
+ m_data(new Wrapped<R>[m_cap]()),
+ m_tombstone_filter(
+ new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS)),
+ m_tscnt(0), m_old_tscnt(0), m_active_head_advance(false) {
+ assert(m_cap > m_hwm);
+ assert(m_hwm >= m_lwm);
+ }
+
+ ~MutableBuffer() {
+ delete[] m_data;
+ delete m_tombstone_filter;
+ }
+
+ int append(const R &rec, bool tombstone = false) {
+ int32_t tail = 0;
+ if ((tail = try_advance_tail()) == -1) {
+ return 0;
}
- int append(const R &rec, bool tombstone=false) {
- int32_t tail = 0;
- if ((tail = try_advance_tail()) == -1) {
- return 0;
- }
-
- Wrapped<R> wrec;
- wrec.rec = rec;
- wrec.header = 0;
- if (tombstone) wrec.set_tombstone();
+ Wrapped<R> wrec;
+ wrec.rec = rec;
+ wrec.header = 0;
+ if (tombstone)
+ wrec.set_tombstone();
- // FIXME: because of the mod, it isn't correct to use `pos`
- // as the ordering timestamp in the header anymore.
- size_t pos = tail % m_cap;
-
- m_data[pos] = wrec;
- m_data[pos].set_timestamp(pos);
-
- if (tombstone) {
- m_tscnt.fetch_add(1);
- if (m_tombstone_filter) m_tombstone_filter->insert(rec);
- }
+ // FIXME: because of the mod, it isn't correct to use `pos`
+ // as the ordering timestamp in the header anymore.
+ size_t pos = tail % m_cap;
- m_data[pos].set_visible();
+ m_data[pos] = wrec;
+ m_data[pos].set_timestamp(pos);
- return 1;
+ if (tombstone) {
+ m_tscnt.fetch_add(1);
+ if (m_tombstone_filter)
+ m_tombstone_filter->insert(rec);
}
- bool truncate() {
- m_tscnt.store(0);
- m_tail.store(0);
- if (m_tombstone_filter) m_tombstone_filter->clear();
+ m_data[pos].set_visible();
- return true;
- }
+ return 1;
+ }
- size_t get_record_count() {
- return m_tail.load() - m_head.load().head_idx;
- }
-
- size_t get_capacity() {
- return m_cap;
- }
+ bool truncate() {
+ m_tscnt.store(0);
+ m_tail.store(0);
+ if (m_tombstone_filter)
+ m_tombstone_filter->clear();
- bool is_full() {
- return get_record_count() >= m_hwm;
- }
+ return true;
+ }
- bool is_at_low_watermark() {
- return get_record_count() >= m_lwm;
- }
+ size_t get_record_count() { return m_tail.load() - m_head.load().head_idx; }
- size_t get_tombstone_count() {
- return m_tscnt.load();
- }
+ size_t get_capacity() { return m_cap; }
- bool delete_record(const R& rec) {
- return get_buffer_view().delete_record(rec);
- }
+ bool is_full() { return get_record_count() >= m_hwm; }
- bool check_tombstone(const R& rec) {
- return get_buffer_view().check_tombstone(rec);
- }
+ bool is_at_low_watermark() { return get_record_count() >= m_lwm; }
- size_t get_memory_usage() {
- return m_cap * sizeof(Wrapped<R>);
- }
+ size_t get_tombstone_count() { return m_tscnt.load(); }
- size_t get_aux_memory_usage() {
- return m_tombstone_filter->get_memory_usage();
- }
+ bool delete_record(const R &rec) {
+ return get_buffer_view().delete_record(rec);
+ }
- BufferView<R> get_buffer_view(size_t target_head) {
- size_t head = get_head(target_head);
- auto f = std::bind(release_head_reference, (void *) this, head);
+ bool check_tombstone(const R &rec) {
+ return get_buffer_view().check_tombstone(rec);
+ }
- return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
- }
+ size_t get_memory_usage() { return m_cap * sizeof(Wrapped<R>); }
- BufferView<R> get_buffer_view() {
- size_t head = get_head(m_head.load().head_idx);
- auto f = std::bind(release_head_reference, (void *) this, head);
+ size_t get_aux_memory_usage() {
+ return m_tombstone_filter->get_memory_usage();
+ }
- return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
- }
+ BufferView<R> get_buffer_view(size_t target_head) {
+ size_t head = get_head(target_head);
+ auto f = std::bind(release_head_reference, (void *)this, head);
- /*
- * Advance the buffer following a reconstruction. Move current
- * head and head_refcnt into old_head and old_head_refcnt, then
- * assign new_head to old_head.
- */
- bool advance_head(size_t new_head) {
- assert(new_head > m_head.load().head_idx);
- assert(new_head <= m_tail.load());
-
- /* refuse to advance head while there is an old with one references */
- if (m_old_head.load().refcnt > 0) {
- //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n");
- return false;
- }
-
- m_active_head_advance.store(true);
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(),
+ m_tombstone_filter, f);
+ }
- buffer_head new_hd = {new_head, 0};
- buffer_head cur_hd;
+ BufferView<R> get_buffer_view() {
+ size_t head = get_head(m_head.load().head_idx);
+ auto f = std::bind(release_head_reference, (void *)this, head);
- /* replace current head with new head */
- do {
- cur_hd = m_head.load();
- } while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(),
+ m_tombstone_filter, f);
+ }
- /* move the current head into the old head */
- m_old_head.store(cur_hd);
+ /*
+ * Advance the buffer following a reconstruction. Move current
+ * head and head_refcnt into old_head and old_head_refcnt, then
+ * assign new_head to old_head.
+ */
+ bool advance_head(size_t new_head) {
+ assert(new_head > m_head.load().head_idx);
+ assert(new_head <= m_tail.load());
- m_active_head_advance.store(false);
- return true;
+ /* refuse to advance head while there is an old with one references */
+ if (m_old_head.load().refcnt > 0) {
+ // fprintf(stderr, "[W]: Refusing to advance head due to remaining
+ // reference counts\n");
+ return false;
}
- /*
- * FIXME: If target_head does not match *either* the old_head or the
- * current_head, this routine will loop infinitely.
- */
- size_t get_head(size_t target_head) {
- buffer_head cur_hd, new_hd;
- bool head_acquired = false;
-
- do {
- if (m_old_head.load().head_idx == target_head) {
- cur_hd = m_old_head.load();
- cur_hd.head_idx = target_head;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
- head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
- } else if (m_head.load().head_idx == target_head){
- cur_hd = m_head.load();
- cur_hd.head_idx = target_head;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
- head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
- }
- } while(!head_acquired);
-
- return new_hd.head_idx;
+ m_active_head_advance.store(true);
+
+ buffer_head new_hd = {new_head, 0};
+ buffer_head cur_hd;
+
+ /* replace current head with new head */
+ do {
+ cur_hd = m_head.load();
+ } while (!m_head.compare_exchange_strong(cur_hd, new_hd));
+
+ /* move the current head into the old head */
+ m_old_head.store(cur_hd);
+
+ m_active_head_advance.store(false);
+ return true;
+ }
+
+ /*
+ * FIXME: If target_head does not match *either* the old_head or the
+ * current_head, this routine will loop infinitely.
+ */
+ size_t get_head(size_t target_head) {
+ buffer_head cur_hd, new_hd;
+ bool head_acquired = false;
+
+ do {
+ if (m_old_head.load().head_idx == target_head) {
+ cur_hd = m_old_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else if (m_head.load().head_idx == target_head) {
+ cur_hd = m_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ } while (!head_acquired);
+
+ return new_hd.head_idx;
+ }
+
+ void set_low_watermark(size_t lwm) {
+ assert(lwm < m_hwm);
+ m_lwm = lwm;
+ }
+
+ size_t get_low_watermark() { return m_lwm; }
+
+ void set_high_watermark(size_t hwm) {
+ assert(hwm > m_lwm);
+ assert(hwm < m_cap);
+ m_hwm = hwm;
+ }
+
+ size_t get_high_watermark() { return m_hwm; }
+
+ size_t get_tail() { return m_tail.load(); }
+
+ /*
+ * Note: this returns the available physical storage capacity,
+ * *not* now many more records can be inserted before the
+ * HWM is reached. It considers the old_head to be "free"
+ * when it has no remaining references. This should be true,
+ * but a buggy framework implementation may violate the
+ * assumption.
+ */
+ size_t get_available_capacity() {
+ if (m_old_head.load().refcnt == 0) {
+ return m_cap - (m_tail.load() - m_head.load().head_idx);
}
- void set_low_watermark(size_t lwm) {
- assert(lwm < m_hwm);
- m_lwm = lwm;
- }
+ return m_cap - (m_tail.load() - m_old_head.load().head_idx);
+ }
- size_t get_low_watermark() {
- return m_lwm;
- }
+private:
+ int64_t try_advance_tail() {
+ size_t old_value = m_tail.load();
- void set_high_watermark(size_t hwm) {
- assert(hwm > m_lwm);
- assert(hwm < m_cap);
- m_hwm = hwm;
+ /* if full, fail to advance the tail */
+ if (old_value - m_head.load().head_idx >= m_hwm) {
+ return -1;
}
- size_t get_high_watermark() {
- return m_hwm;
- }
+ while (!m_tail.compare_exchange_strong(old_value, old_value + 1)) {
+ /* if full, stop trying and fail to advance the tail */
+ if (m_tail.load() >= m_hwm) {
+ return -1;
+ }
- size_t get_tail() {
- return m_tail.load();
+ _mm_pause();
}
- /*
- * Note: this returns the available physical storage capacity,
- * *not* now many more records can be inserted before the
- * HWM is reached. It considers the old_head to be "free"
- * when it has no remaining references. This should be true,
- * but a buggy framework implementation may violate the
- * assumption.
- */
- size_t get_available_capacity() {
- if (m_old_head.load().refcnt == 0) {
- return m_cap - (m_tail.load() - m_head.load().head_idx);
- }
+ return old_value;
+ }
- return m_cap - (m_tail.load() - m_old_head.load().head_idx);
- }
+ size_t to_idx(size_t i, size_t head) { return (head + i) % m_cap; }
-private:
- int64_t try_advance_tail() {
- size_t old_value = m_tail.load();
+ static void release_head_reference(void *buff, size_t head) {
+ MutableBuffer<R> *buffer = (MutableBuffer<R> *)buff;
- /* if full, fail to advance the tail */
- if (old_value - m_head.load().head_idx >= m_hwm) {
- return -1;
+ buffer_head cur_hd, new_hd;
+ do {
+ if (buffer->m_old_head.load().head_idx == head) {
+ cur_hd = buffer->m_old_head;
+ if (cur_hd.refcnt == 0)
+ continue;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1};
+ if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
+ break;
}
-
- while (!m_tail.compare_exchange_strong(old_value, old_value+1)) {
- /* if full, stop trying and fail to advance the tail */
- if (m_tail.load() >= m_hwm) {
- return -1;
- }
-
- _mm_pause();
+ } else {
+ cur_hd = buffer->m_head;
+ if (cur_hd.refcnt == 0)
+ continue;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1};
+
+ if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
+ break;
}
+ }
+ _mm_pause();
+ } while (true);
+ }
- return old_value;
- }
+ size_t m_lwm;
+ size_t m_hwm;
+ size_t m_cap;
- size_t to_idx(size_t i, size_t head) {
- return (head + i) % m_cap;
- }
+ alignas(64) std::atomic<size_t> m_tail;
- static void release_head_reference(void *buff, size_t head) {
- MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff;
-
- buffer_head cur_hd, new_hd;
- do {
- if (buffer->m_old_head.load().head_idx == head) {
- cur_hd = buffer->m_old_head;
- if (cur_hd.refcnt == 0) continue;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
- if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
- break;
- }
- } else {
- cur_hd = buffer->m_head;
- if (cur_hd.refcnt == 0) continue;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
-
- if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
- break;
- }
- }
- _mm_pause();
- } while(true);
- }
+ alignas(64) std::atomic<buffer_head> m_head;
+ alignas(64) std::atomic<buffer_head> m_old_head;
+
+ Wrapped<R> *m_data;
+ psudb::BloomFilter<R> *m_tombstone_filter;
+ alignas(64) std::atomic<size_t> m_tscnt;
+ size_t m_old_tscnt;
- size_t m_lwm;
- size_t m_hwm;
- size_t m_cap;
-
- alignas(64) std::atomic<size_t> m_tail;
-
- alignas(64) std::atomic<buffer_head> m_head;
- alignas(64) std::atomic<buffer_head> m_old_head;
-
- Wrapped<R>* m_data;
- psudb::BloomFilter<R>* m_tombstone_filter;
- alignas(64) std::atomic<size_t> m_tscnt;
- size_t m_old_tscnt;
-
- alignas(64) std::atomic<bool> m_active_head_advance;
+ alignas(64) std::atomic<bool> m_active_head_advance;
};
-}
+} // namespace de