summaryrefslogtreecommitdiffstats
path: root/include/framework/structure/MutableBuffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/structure/MutableBuffer.h')
-rw-r--r--include/framework/structure/MutableBuffer.h116
1 files changed, 37 insertions, 79 deletions
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 58b5fb4..7bec219 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -6,12 +6,6 @@
*
* Distributed under the Modified BSD License.
*
- * FIXME: currently, the buffer itself is responsible for managing a
- * secondary buffer for storing sorted records used during buffer flushes. It
- * probably makes more sense to make the shard being flushed into responsible
- * for this instead. This would also facilitate simultaneous flushes of multiple
- * buffers more easily.
- *
*/
#pragma once
@@ -29,6 +23,7 @@
#include "psu-ds/Alias.h"
#include "psu-util/timer.h"
#include "framework/interface/Record.h"
+#include "framework/structure/BufferView.h"
using psudb::CACHELINE_SIZE;
@@ -36,18 +31,22 @@ namespace de {
template <RecordInterface R>
class MutableBuffer {
+ friend class BufferView<R>;
public:
- MutableBuffer(size_t capacity, size_t max_tombstone_cap)
- : m_cap(capacity), m_tombstone_cap(capacity), m_reccnt(0)
- , m_tombstonecnt(0), m_weight(0), m_max_weight(0), m_tail(0) {
- m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
- m_sorted_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
- m_tombstone_filter = nullptr;
- if (max_tombstone_cap > 0) {
- m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS);
+ MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0)
+ : m_lwm(low_watermark), m_hwm(high_watermark), m_cap(capacity), m_head(0), m_tail(0) {
+ /*
+ * default capacity is twice the high water mark, to account for the worst-case
+ * memory requirements.
+ */
+ if (m_cap == 0) {
+ m_cap = m_hwm * 2;
}
- m_refcnt.store(0);
+ m_data = (Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>));
+
+ // FIXME: need to figure out how to detail with tombstones at some point...
+ m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS);
}
~MutableBuffer() {
@@ -55,13 +54,10 @@ public:
if (m_data) free(m_data);
if (m_tombstone_filter) delete m_tombstone_filter;
- if (m_sorted_data) free(m_sorted_data);
}
template <typename R_ = R>
int append(const R &rec, bool tombstone=false) {
- if (tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0;
-
int32_t pos = 0;
if ((pos = try_advance_tail()) == -1) return 0;
@@ -78,26 +74,11 @@ public:
if (m_tombstone_filter) m_tombstone_filter->insert(rec);
}
- if constexpr (WeightedRecordInterface<R_>) {
- m_weight.fetch_add(rec.weight);
- double old = m_max_weight.load();
- while (old < rec.weight) {
- m_max_weight.compare_exchange_strong(old, rec.weight);
- old = m_max_weight.load();
- }
- } else {
- m_weight.fetch_add(1);
- }
-
- m_reccnt.fetch_add(1);
return 1;
}
bool truncate() {
m_tombstonecnt.store(0);
- m_reccnt.store(0);
- m_weight.store(0);
- m_max_weight.store(0);
m_tail.store(0);
if (m_tombstone_filter) m_tombstone_filter->clear();
@@ -105,7 +86,7 @@ public:
}
size_t get_record_count() {
- return m_reccnt;
+ return (m_tail - m_head) % m_cap;
}
size_t get_capacity() {
@@ -113,7 +94,7 @@ public:
}
bool is_full() {
- return m_reccnt == m_cap;
+ return (m_tail % m_cap) >= m_hwm;
}
size_t get_tombstone_count() {
@@ -121,13 +102,11 @@ public:
}
bool delete_record(const R& rec) {
- auto offset = 0;
- while (offset < m_reccnt.load()) {
- if (m_data[offset].rec == rec) {
- m_data[offset].set_delete();
+ for (size_t i=0; i<get_record_count(); i++) {
+ if (m_data[to_idx(i)].rec == rec) {
+ m_data[to_idx(i)].set_delete();
return true;
}
- offset++;
}
return false;
@@ -136,13 +115,12 @@ public:
bool check_tombstone(const R& rec) {
if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false;
- auto offset = 0;
- while (offset < m_reccnt.load()) {
- if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) {
+ for (size_t i=0; i<get_record_count(); i++) {
+ if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) {
return true;
}
- offset++;;
}
+
return false;
}
@@ -155,31 +133,8 @@ public:
}
size_t get_tombstone_capacity() {
- return m_tombstone_cap;
- }
-
- double get_total_weight() {
- return m_weight.load();
- }
-
- Wrapped<R> *get_data() {
- return m_data;
- }
-
- double get_max_weight() {
- return m_max_weight;
- }
-
- /*
- * This operation assumes that no other threads have write access
- * to the buffer. This will be the case in normal operation, at
- * present, but may change (in which case this approach will need
- * to be adjusted). Other threads having read access is perfectly
- * acceptable, however.
- */
- bool start_flush() {
- memcpy(m_sorted_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load());
- return true;
+ // FIXME: tombstone capacity needs figured out again
+ return m_cap;
}
/*
@@ -202,30 +157,33 @@ public:
private:
int64_t try_advance_tail() {
- int64_t new_tail = m_tail.fetch_add(1);
+ int64_t new_tail = m_tail.fetch_add(1) % m_cap;
- if (new_tail < m_cap) {
+ if (new_tail < m_hwm) {
return new_tail;
- }
+ }
m_tail.fetch_add(-1);
return -1;
}
+ size_t to_idx(size_t i) {
+ return (m_head + i) % m_cap;
+ }
+
size_t m_cap;
- size_t m_tombstone_cap;
+
+ size_t m_lwm;
+ size_t m_hwm;
+
+ alignas(64) std::atomic<size_t> m_tail;
+ alignas(64) std::atomic<size_t> m_head;
Wrapped<R>* m_data;
- Wrapped<R>* m_sorted_data;
psudb::BloomFilter<R>* m_tombstone_filter;
alignas(64) std::atomic<size_t> m_tombstonecnt;
- alignas(64) std::atomic<uint32_t> m_reccnt;
- alignas(64) std::atomic<int64_t> m_tail;
- alignas(64) std::atomic<double> m_weight;
- alignas(64) std::atomic<double> m_max_weight;
-
alignas(64) std::atomic<size_t> m_refcnt;
};