diff options
| author | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-01-09 14:47:48 -0500 |
|---|---|---|
| committer | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-01-09 14:49:27 -0500 |
| commit | 3c2e6b3b456867d7155b158432b891b84e4e1dd6 (patch) | |
| tree | cc3720e28a0015c1516b32c5b3b174feed8c1657 | |
| parent | fa0952cba5e59e8d664654ad19ae96c580bc0bf4 (diff) | |
| download | dynamic-extension-3c2e6b3b456867d7155b158432b891b84e4e1dd6.tar.gz | |
Initial update of buffer to new specifications
There are a few minor issues that this introduces, however. Global
tracking of a lot of secondary information, such as weights for WIRS/WSS,
or the exact number of tombstones will need to be approached differently
than they have been historically with this new approach.
I've also removed most of the tombstone capacity related code. We had
decided not to bother enforcing this at the buffer level anyway, and it
would greatly increase the complexity of the problem of predicting when
the next compaction will be.
On the whole this new approach seems like it'll simplify a lot. This
commit actually removes significantly more code than it adds.
One minor issue: the currently implementation will have problems
in the circular array indexes once more than 2^64 records have been
inserted. This doesn't seem like a realistic problem at the moment.
| -rw-r--r-- | include/framework/structure/BufferView.h | 98 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 116 |
2 files changed, 62 insertions, 152 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 651e430..8a5f50f 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -22,107 +22,59 @@ #include "psu-ds/Alias.h" #include "psu-util/timer.h" #include "framework/interface/Record.h" -#include "framework/structure/MutableBuffer.h" #include "framework/interface/Query.h" namespace de { -template <RecordInterface R, QueryInterface Q> +template <RecordInterface R> class BufferView { - typedef MutableBuffer<R> Buffer; public: BufferView() = default; - BufferView(std::vector<Buffer*> buffers) - : m_buffers(buffers) - , m_cutoff(buffers[buffers.size()-1]->get_record_count()) - {} + BufferView(const Wrapped<R> *buffer, size_t head, size_t tail, psudb::BloomFilter<R> *filter) + : m_buffer(buffer), m_head(head), m_tail(tail), m_tombstone_filter(filter) {} ~BufferView() = default; - bool delete_record(const R& rec) { - auto res = false; - for (auto buf : m_buffers) { - res = buf->delete_record(rec); - if (res) return true; - } - return false; - } - bool check_tombstone(const R& rec) { - auto res = false; - for (auto buf : m_buffers) { - res = buf->check_tombstone(rec); - if (res) return true; + if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; + + for (size_t i=0; i<get_record_count(); i++) { + if (m_buffer[to_idx(i)].rec == rec && m_buffer[to_idx(i)].is_tombstone()) { + return true; + } } + return false; } size_t get_record_count() { - size_t reccnt = 0; - for (auto buf : m_buffers) { - reccnt += buf->get_record_count(); - } - return reccnt; + return m_tail - m_head; } - size_t get_capacity() { - return m_buffers[0]->get_capacity(); - } - - bool is_full() { - return m_buffers[m_buffers.size() - 1]->is_full(); - } - size_t get_tombstone_count() { - size_t tscnt = 0; - for (auto buf : m_buffers) { - tscnt += buf->get_tombstone_count(); - } - return tscnt; + // FIXME: tombstone count + return 0; } - size_t get_memory_usage() { - size_t mem = 0; - for (auto buf : m_buffers) { - mem += buf->get_memory_usage(); - } - return mem; + Wrapped<R> *get(size_t i) { + assert(i < get_record_count()); + return m_buffer + to_idx(i); } - size_t get_aux_memory_usage() { - size_t mem = 0; - for (auto buf : m_buffers) { - mem += buf->get_aux_memory_usage(); - } - return mem; - } - - size_t get_tombstone_capacity() { - return m_buffers[0]->get_tombstone_capacity(); - } - - std::vector<void *> get_query_states(void *parms) { - std::vector<void *> states; - - for (auto buf : m_buffers) { - states.push_back(Q::get_buffer_query_state(buf, parms)); - } - - return states; + void copy_to_buffer(byte *buffer) { + memcpy(buffer, m_buffer, get_record_count() * sizeof(Wrapped<R>)); } - std::vector<Buffer *> &get_buffers() { - return m_buffers; - } +private: + const Wrapped<R>* m_buffer; + size_t m_head; + size_t m_tail; + psudb::BloomFilter<R> *m_tombstone_filter; - size_t size() { - return m_buffers.size(); + size_t to_idx(size_t i) { + return (m_head + i) % m_buffer->get_capacity(); } - -private: - std::vector<Buffer *> m_buffers; - size_t m_cutoff; }; } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 58b5fb4..7bec219 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -6,12 +6,6 @@ * * Distributed under the Modified BSD License. * - * FIXME: currently, the buffer itself is responsible for managing a - * secondary buffer for storing sorted records used during buffer flushes. It - * probably makes more sense to make the shard being flushed into responsible - * for this instead. This would also facilitate simultaneous flushes of multiple - * buffers more easily. - * */ #pragma once @@ -29,6 +23,7 @@ #include "psu-ds/Alias.h" #include "psu-util/timer.h" #include "framework/interface/Record.h" +#include "framework/structure/BufferView.h" using psudb::CACHELINE_SIZE; @@ -36,18 +31,22 @@ namespace de { template <RecordInterface R> class MutableBuffer { + friend class BufferView<R>; public: - MutableBuffer(size_t capacity, size_t max_tombstone_cap) - : m_cap(capacity), m_tombstone_cap(capacity), m_reccnt(0) - , m_tombstonecnt(0), m_weight(0), m_max_weight(0), m_tail(0) { - m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>)); - m_sorted_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>)); - m_tombstone_filter = nullptr; - if (max_tombstone_cap > 0) { - m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS); + MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0) + : m_lwm(low_watermark), m_hwm(high_watermark), m_cap(capacity), m_head(0), m_tail(0) { + /* + * default capacity is twice the high water mark, to account for the worst-case + * memory requirements. + */ + if (m_cap == 0) { + m_cap = m_hwm * 2; } - m_refcnt.store(0); + m_data = (Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)); + + // FIXME: need to figure out how to detail with tombstones at some point... + m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS); } ~MutableBuffer() { @@ -55,13 +54,10 @@ public: if (m_data) free(m_data); if (m_tombstone_filter) delete m_tombstone_filter; - if (m_sorted_data) free(m_sorted_data); } template <typename R_ = R> int append(const R &rec, bool tombstone=false) { - if (tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; - int32_t pos = 0; if ((pos = try_advance_tail()) == -1) return 0; @@ -78,26 +74,11 @@ public: if (m_tombstone_filter) m_tombstone_filter->insert(rec); } - if constexpr (WeightedRecordInterface<R_>) { - m_weight.fetch_add(rec.weight); - double old = m_max_weight.load(); - while (old < rec.weight) { - m_max_weight.compare_exchange_strong(old, rec.weight); - old = m_max_weight.load(); - } - } else { - m_weight.fetch_add(1); - } - - m_reccnt.fetch_add(1); return 1; } bool truncate() { m_tombstonecnt.store(0); - m_reccnt.store(0); - m_weight.store(0); - m_max_weight.store(0); m_tail.store(0); if (m_tombstone_filter) m_tombstone_filter->clear(); @@ -105,7 +86,7 @@ public: } size_t get_record_count() { - return m_reccnt; + return (m_tail - m_head) % m_cap; } size_t get_capacity() { @@ -113,7 +94,7 @@ public: } bool is_full() { - return m_reccnt == m_cap; + return (m_tail % m_cap) >= m_hwm; } size_t get_tombstone_count() { @@ -121,13 +102,11 @@ public: } bool delete_record(const R& rec) { - auto offset = 0; - while (offset < m_reccnt.load()) { - if (m_data[offset].rec == rec) { - m_data[offset].set_delete(); + for (size_t i=0; i<get_record_count(); i++) { + if (m_data[to_idx(i)].rec == rec) { + m_data[to_idx(i)].set_delete(); return true; } - offset++; } return false; @@ -136,13 +115,12 @@ public: bool check_tombstone(const R& rec) { if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; - auto offset = 0; - while (offset < m_reccnt.load()) { - if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) { + for (size_t i=0; i<get_record_count(); i++) { + if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) { return true; } - offset++;; } + return false; } @@ -155,31 +133,8 @@ public: } size_t get_tombstone_capacity() { - return m_tombstone_cap; - } - - double get_total_weight() { - return m_weight.load(); - } - - Wrapped<R> *get_data() { - return m_data; - } - - double get_max_weight() { - return m_max_weight; - } - - /* - * This operation assumes that no other threads have write access - * to the buffer. This will be the case in normal operation, at - * present, but may change (in which case this approach will need - * to be adjusted). Other threads having read access is perfectly - * acceptable, however. - */ - bool start_flush() { - memcpy(m_sorted_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load()); - return true; + // FIXME: tombstone capacity needs figured out again + return m_cap; } /* @@ -202,30 +157,33 @@ public: private: int64_t try_advance_tail() { - int64_t new_tail = m_tail.fetch_add(1); + int64_t new_tail = m_tail.fetch_add(1) % m_cap; - if (new_tail < m_cap) { + if (new_tail < m_hwm) { return new_tail; - } + } m_tail.fetch_add(-1); return -1; } + size_t to_idx(size_t i) { + return (m_head + i) % m_cap; + } + size_t m_cap; - size_t m_tombstone_cap; + + size_t m_lwm; + size_t m_hwm; + + alignas(64) std::atomic<size_t> m_tail; + alignas(64) std::atomic<size_t> m_head; Wrapped<R>* m_data; - Wrapped<R>* m_sorted_data; psudb::BloomFilter<R>* m_tombstone_filter; alignas(64) std::atomic<size_t> m_tombstonecnt; - alignas(64) std::atomic<uint32_t> m_reccnt; - alignas(64) std::atomic<int64_t> m_tail; - alignas(64) std::atomic<double> m_weight; - alignas(64) std::atomic<double> m_max_weight; - alignas(64) std::atomic<size_t> m_refcnt; }; |