summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-01-09 14:47:48 -0500
committerDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-01-09 14:49:27 -0500
commit3c2e6b3b456867d7155b158432b891b84e4e1dd6 (patch)
treecc3720e28a0015c1516b32c5b3b174feed8c1657
parentfa0952cba5e59e8d664654ad19ae96c580bc0bf4 (diff)
downloaddynamic-extension-3c2e6b3b456867d7155b158432b891b84e4e1dd6.tar.gz
Initial update of buffer to new specifications
There are a few minor issues that this introduces, however. Global tracking of a lot of secondary information, such as weights for WIRS/WSS, or the exact number of tombstones will need to be approached differently than they have been historically with this new approach. I've also removed most of the tombstone capacity related code. We had decided not to bother enforcing this at the buffer level anyway, and it would greatly increase the complexity of the problem of predicting when the next compaction will be. On the whole this new approach seems like it'll simplify a lot. This commit actually removes significantly more code than it adds. One minor issue: the currently implementation will have problems in the circular array indexes once more than 2^64 records have been inserted. This doesn't seem like a realistic problem at the moment.
-rw-r--r--include/framework/structure/BufferView.h98
-rw-r--r--include/framework/structure/MutableBuffer.h116
2 files changed, 62 insertions, 152 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index 651e430..8a5f50f 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -22,107 +22,59 @@
#include "psu-ds/Alias.h"
#include "psu-util/timer.h"
#include "framework/interface/Record.h"
-#include "framework/structure/MutableBuffer.h"
#include "framework/interface/Query.h"
namespace de {
-template <RecordInterface R, QueryInterface Q>
+template <RecordInterface R>
class BufferView {
- typedef MutableBuffer<R> Buffer;
public:
BufferView() = default;
- BufferView(std::vector<Buffer*> buffers)
- : m_buffers(buffers)
- , m_cutoff(buffers[buffers.size()-1]->get_record_count())
- {}
+ BufferView(const Wrapped<R> *buffer, size_t head, size_t tail, psudb::BloomFilter<R> *filter)
+ : m_buffer(buffer), m_head(head), m_tail(tail), m_tombstone_filter(filter) {}
~BufferView() = default;
- bool delete_record(const R& rec) {
- auto res = false;
- for (auto buf : m_buffers) {
- res = buf->delete_record(rec);
- if (res) return true;
- }
- return false;
- }
-
bool check_tombstone(const R& rec) {
- auto res = false;
- for (auto buf : m_buffers) {
- res = buf->check_tombstone(rec);
- if (res) return true;
+ if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false;
+
+ for (size_t i=0; i<get_record_count(); i++) {
+ if (m_buffer[to_idx(i)].rec == rec && m_buffer[to_idx(i)].is_tombstone()) {
+ return true;
+ }
}
+
return false;
}
size_t get_record_count() {
- size_t reccnt = 0;
- for (auto buf : m_buffers) {
- reccnt += buf->get_record_count();
- }
- return reccnt;
+ return m_tail - m_head;
}
- size_t get_capacity() {
- return m_buffers[0]->get_capacity();
- }
-
- bool is_full() {
- return m_buffers[m_buffers.size() - 1]->is_full();
- }
-
size_t get_tombstone_count() {
- size_t tscnt = 0;
- for (auto buf : m_buffers) {
- tscnt += buf->get_tombstone_count();
- }
- return tscnt;
+ // FIXME: tombstone count
+ return 0;
}
- size_t get_memory_usage() {
- size_t mem = 0;
- for (auto buf : m_buffers) {
- mem += buf->get_memory_usage();
- }
- return mem;
+ Wrapped<R> *get(size_t i) {
+ assert(i < get_record_count());
+ return m_buffer + to_idx(i);
}
- size_t get_aux_memory_usage() {
- size_t mem = 0;
- for (auto buf : m_buffers) {
- mem += buf->get_aux_memory_usage();
- }
- return mem;
- }
-
- size_t get_tombstone_capacity() {
- return m_buffers[0]->get_tombstone_capacity();
- }
-
- std::vector<void *> get_query_states(void *parms) {
- std::vector<void *> states;
-
- for (auto buf : m_buffers) {
- states.push_back(Q::get_buffer_query_state(buf, parms));
- }
-
- return states;
+ void copy_to_buffer(byte *buffer) {
+ memcpy(buffer, m_buffer, get_record_count() * sizeof(Wrapped<R>));
}
- std::vector<Buffer *> &get_buffers() {
- return m_buffers;
- }
+private:
+ const Wrapped<R>* m_buffer;
+ size_t m_head;
+ size_t m_tail;
+ psudb::BloomFilter<R> *m_tombstone_filter;
- size_t size() {
- return m_buffers.size();
+ size_t to_idx(size_t i) {
+ return (m_head + i) % m_buffer->get_capacity();
}
-
-private:
- std::vector<Buffer *> m_buffers;
- size_t m_cutoff;
};
}
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 58b5fb4..7bec219 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -6,12 +6,6 @@
*
* Distributed under the Modified BSD License.
*
- * FIXME: currently, the buffer itself is responsible for managing a
- * secondary buffer for storing sorted records used during buffer flushes. It
- * probably makes more sense to make the shard being flushed into responsible
- * for this instead. This would also facilitate simultaneous flushes of multiple
- * buffers more easily.
- *
*/
#pragma once
@@ -29,6 +23,7 @@
#include "psu-ds/Alias.h"
#include "psu-util/timer.h"
#include "framework/interface/Record.h"
+#include "framework/structure/BufferView.h"
using psudb::CACHELINE_SIZE;
@@ -36,18 +31,22 @@ namespace de {
template <RecordInterface R>
class MutableBuffer {
+ friend class BufferView<R>;
public:
- MutableBuffer(size_t capacity, size_t max_tombstone_cap)
- : m_cap(capacity), m_tombstone_cap(capacity), m_reccnt(0)
- , m_tombstonecnt(0), m_weight(0), m_max_weight(0), m_tail(0) {
- m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
- m_sorted_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
- m_tombstone_filter = nullptr;
- if (max_tombstone_cap > 0) {
- m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS);
+ MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0)
+ : m_lwm(low_watermark), m_hwm(high_watermark), m_cap(capacity), m_head(0), m_tail(0) {
+ /*
+ * default capacity is twice the high water mark, to account for the worst-case
+ * memory requirements.
+ */
+ if (m_cap == 0) {
+ m_cap = m_hwm * 2;
}
- m_refcnt.store(0);
+ m_data = (Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>));
+
+ // FIXME: need to figure out how to detail with tombstones at some point...
+ m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS);
}
~MutableBuffer() {
@@ -55,13 +54,10 @@ public:
if (m_data) free(m_data);
if (m_tombstone_filter) delete m_tombstone_filter;
- if (m_sorted_data) free(m_sorted_data);
}
template <typename R_ = R>
int append(const R &rec, bool tombstone=false) {
- if (tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0;
-
int32_t pos = 0;
if ((pos = try_advance_tail()) == -1) return 0;
@@ -78,26 +74,11 @@ public:
if (m_tombstone_filter) m_tombstone_filter->insert(rec);
}
- if constexpr (WeightedRecordInterface<R_>) {
- m_weight.fetch_add(rec.weight);
- double old = m_max_weight.load();
- while (old < rec.weight) {
- m_max_weight.compare_exchange_strong(old, rec.weight);
- old = m_max_weight.load();
- }
- } else {
- m_weight.fetch_add(1);
- }
-
- m_reccnt.fetch_add(1);
return 1;
}
bool truncate() {
m_tombstonecnt.store(0);
- m_reccnt.store(0);
- m_weight.store(0);
- m_max_weight.store(0);
m_tail.store(0);
if (m_tombstone_filter) m_tombstone_filter->clear();
@@ -105,7 +86,7 @@ public:
}
size_t get_record_count() {
- return m_reccnt;
+ return (m_tail - m_head) % m_cap;
}
size_t get_capacity() {
@@ -113,7 +94,7 @@ public:
}
bool is_full() {
- return m_reccnt == m_cap;
+ return (m_tail % m_cap) >= m_hwm;
}
size_t get_tombstone_count() {
@@ -121,13 +102,11 @@ public:
}
bool delete_record(const R& rec) {
- auto offset = 0;
- while (offset < m_reccnt.load()) {
- if (m_data[offset].rec == rec) {
- m_data[offset].set_delete();
+ for (size_t i=0; i<get_record_count(); i++) {
+ if (m_data[to_idx(i)].rec == rec) {
+ m_data[to_idx(i)].set_delete();
return true;
}
- offset++;
}
return false;
@@ -136,13 +115,12 @@ public:
bool check_tombstone(const R& rec) {
if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false;
- auto offset = 0;
- while (offset < m_reccnt.load()) {
- if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) {
+ for (size_t i=0; i<get_record_count(); i++) {
+ if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) {
return true;
}
- offset++;;
}
+
return false;
}
@@ -155,31 +133,8 @@ public:
}
size_t get_tombstone_capacity() {
- return m_tombstone_cap;
- }
-
- double get_total_weight() {
- return m_weight.load();
- }
-
- Wrapped<R> *get_data() {
- return m_data;
- }
-
- double get_max_weight() {
- return m_max_weight;
- }
-
- /*
- * This operation assumes that no other threads have write access
- * to the buffer. This will be the case in normal operation, at
- * present, but may change (in which case this approach will need
- * to be adjusted). Other threads having read access is perfectly
- * acceptable, however.
- */
- bool start_flush() {
- memcpy(m_sorted_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load());
- return true;
+ // FIXME: tombstone capacity needs figured out again
+ return m_cap;
}
/*
@@ -202,30 +157,33 @@ public:
private:
int64_t try_advance_tail() {
- int64_t new_tail = m_tail.fetch_add(1);
+ int64_t new_tail = m_tail.fetch_add(1) % m_cap;
- if (new_tail < m_cap) {
+ if (new_tail < m_hwm) {
return new_tail;
- }
+ }
m_tail.fetch_add(-1);
return -1;
}
+ size_t to_idx(size_t i) {
+ return (m_head + i) % m_cap;
+ }
+
size_t m_cap;
- size_t m_tombstone_cap;
+
+ size_t m_lwm;
+ size_t m_hwm;
+
+ alignas(64) std::atomic<size_t> m_tail;
+ alignas(64) std::atomic<size_t> m_head;
Wrapped<R>* m_data;
- Wrapped<R>* m_sorted_data;
psudb::BloomFilter<R>* m_tombstone_filter;
alignas(64) std::atomic<size_t> m_tombstonecnt;
- alignas(64) std::atomic<uint32_t> m_reccnt;
- alignas(64) std::atomic<int64_t> m_tail;
- alignas(64) std::atomic<double> m_weight;
- alignas(64) std::atomic<double> m_max_weight;
-
alignas(64) std::atomic<size_t> m_refcnt;
};