diff options
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/structure/BufferView.h | 98 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 116 |
2 files changed, 62 insertions, 152 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 651e430..8a5f50f 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -22,107 +22,59 @@ #include "psu-ds/Alias.h" #include "psu-util/timer.h" #include "framework/interface/Record.h" -#include "framework/structure/MutableBuffer.h" #include "framework/interface/Query.h" namespace de { -template <RecordInterface R, QueryInterface Q> +template <RecordInterface R> class BufferView { - typedef MutableBuffer<R> Buffer; public: BufferView() = default; - BufferView(std::vector<Buffer*> buffers) - : m_buffers(buffers) - , m_cutoff(buffers[buffers.size()-1]->get_record_count()) - {} + BufferView(const Wrapped<R> *buffer, size_t head, size_t tail, psudb::BloomFilter<R> *filter) + : m_buffer(buffer), m_head(head), m_tail(tail), m_tombstone_filter(filter) {} ~BufferView() = default; - bool delete_record(const R& rec) { - auto res = false; - for (auto buf : m_buffers) { - res = buf->delete_record(rec); - if (res) return true; - } - return false; - } - bool check_tombstone(const R& rec) { - auto res = false; - for (auto buf : m_buffers) { - res = buf->check_tombstone(rec); - if (res) return true; + if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; + + for (size_t i=0; i<get_record_count(); i++) { + if (m_buffer[to_idx(i)].rec == rec && m_buffer[to_idx(i)].is_tombstone()) { + return true; + } } + return false; } size_t get_record_count() { - size_t reccnt = 0; - for (auto buf : m_buffers) { - reccnt += buf->get_record_count(); - } - return reccnt; + return m_tail - m_head; } - size_t get_capacity() { - return m_buffers[0]->get_capacity(); - } - - bool is_full() { - return m_buffers[m_buffers.size() - 1]->is_full(); - } - size_t get_tombstone_count() { - size_t tscnt = 0; - for (auto buf : m_buffers) { - tscnt += buf->get_tombstone_count(); - } - return tscnt; + // FIXME: tombstone count + return 0; } - size_t get_memory_usage() { - size_t mem = 0; - for (auto buf : m_buffers) { - mem += buf->get_memory_usage(); - } - return mem; + Wrapped<R> *get(size_t i) { + assert(i < get_record_count()); + return m_buffer + to_idx(i); } - size_t get_aux_memory_usage() { - size_t mem = 0; - for (auto buf : m_buffers) { - mem += buf->get_aux_memory_usage(); - } - return mem; - } - - size_t get_tombstone_capacity() { - return m_buffers[0]->get_tombstone_capacity(); - } - - std::vector<void *> get_query_states(void *parms) { - std::vector<void *> states; - - for (auto buf : m_buffers) { - states.push_back(Q::get_buffer_query_state(buf, parms)); - } - - return states; + void copy_to_buffer(byte *buffer) { + memcpy(buffer, m_buffer, get_record_count() * sizeof(Wrapped<R>)); } - std::vector<Buffer *> &get_buffers() { - return m_buffers; - } +private: + const Wrapped<R>* m_buffer; + size_t m_head; + size_t m_tail; + psudb::BloomFilter<R> *m_tombstone_filter; - size_t size() { - return m_buffers.size(); + size_t to_idx(size_t i) { + return (m_head + i) % m_buffer->get_capacity(); } - -private: - std::vector<Buffer *> m_buffers; - size_t m_cutoff; }; } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 58b5fb4..7bec219 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -6,12 +6,6 @@ * * Distributed under the Modified BSD License. * - * FIXME: currently, the buffer itself is responsible for managing a - * secondary buffer for storing sorted records used during buffer flushes. It - * probably makes more sense to make the shard being flushed into responsible - * for this instead. This would also facilitate simultaneous flushes of multiple - * buffers more easily. - * */ #pragma once @@ -29,6 +23,7 @@ #include "psu-ds/Alias.h" #include "psu-util/timer.h" #include "framework/interface/Record.h" +#include "framework/structure/BufferView.h" using psudb::CACHELINE_SIZE; @@ -36,18 +31,22 @@ namespace de { template <RecordInterface R> class MutableBuffer { + friend class BufferView<R>; public: - MutableBuffer(size_t capacity, size_t max_tombstone_cap) - : m_cap(capacity), m_tombstone_cap(capacity), m_reccnt(0) - , m_tombstonecnt(0), m_weight(0), m_max_weight(0), m_tail(0) { - m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>)); - m_sorted_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>)); - m_tombstone_filter = nullptr; - if (max_tombstone_cap > 0) { - m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS); + MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0) + : m_lwm(low_watermark), m_hwm(high_watermark), m_cap(capacity), m_head(0), m_tail(0) { + /* + * default capacity is twice the high water mark, to account for the worst-case + * memory requirements. + */ + if (m_cap == 0) { + m_cap = m_hwm * 2; } - m_refcnt.store(0); + m_data = (Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)); + + // FIXME: need to figure out how to detail with tombstones at some point... + m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS); } ~MutableBuffer() { @@ -55,13 +54,10 @@ public: if (m_data) free(m_data); if (m_tombstone_filter) delete m_tombstone_filter; - if (m_sorted_data) free(m_sorted_data); } template <typename R_ = R> int append(const R &rec, bool tombstone=false) { - if (tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; - int32_t pos = 0; if ((pos = try_advance_tail()) == -1) return 0; @@ -78,26 +74,11 @@ public: if (m_tombstone_filter) m_tombstone_filter->insert(rec); } - if constexpr (WeightedRecordInterface<R_>) { - m_weight.fetch_add(rec.weight); - double old = m_max_weight.load(); - while (old < rec.weight) { - m_max_weight.compare_exchange_strong(old, rec.weight); - old = m_max_weight.load(); - } - } else { - m_weight.fetch_add(1); - } - - m_reccnt.fetch_add(1); return 1; } bool truncate() { m_tombstonecnt.store(0); - m_reccnt.store(0); - m_weight.store(0); - m_max_weight.store(0); m_tail.store(0); if (m_tombstone_filter) m_tombstone_filter->clear(); @@ -105,7 +86,7 @@ public: } size_t get_record_count() { - return m_reccnt; + return (m_tail - m_head) % m_cap; } size_t get_capacity() { @@ -113,7 +94,7 @@ public: } bool is_full() { - return m_reccnt == m_cap; + return (m_tail % m_cap) >= m_hwm; } size_t get_tombstone_count() { @@ -121,13 +102,11 @@ public: } bool delete_record(const R& rec) { - auto offset = 0; - while (offset < m_reccnt.load()) { - if (m_data[offset].rec == rec) { - m_data[offset].set_delete(); + for (size_t i=0; i<get_record_count(); i++) { + if (m_data[to_idx(i)].rec == rec) { + m_data[to_idx(i)].set_delete(); return true; } - offset++; } return false; @@ -136,13 +115,12 @@ public: bool check_tombstone(const R& rec) { if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; - auto offset = 0; - while (offset < m_reccnt.load()) { - if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) { + for (size_t i=0; i<get_record_count(); i++) { + if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) { return true; } - offset++;; } + return false; } @@ -155,31 +133,8 @@ public: } size_t get_tombstone_capacity() { - return m_tombstone_cap; - } - - double get_total_weight() { - return m_weight.load(); - } - - Wrapped<R> *get_data() { - return m_data; - } - - double get_max_weight() { - return m_max_weight; - } - - /* - * This operation assumes that no other threads have write access - * to the buffer. This will be the case in normal operation, at - * present, but may change (in which case this approach will need - * to be adjusted). Other threads having read access is perfectly - * acceptable, however. - */ - bool start_flush() { - memcpy(m_sorted_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load()); - return true; + // FIXME: tombstone capacity needs figured out again + return m_cap; } /* @@ -202,30 +157,33 @@ public: private: int64_t try_advance_tail() { - int64_t new_tail = m_tail.fetch_add(1); + int64_t new_tail = m_tail.fetch_add(1) % m_cap; - if (new_tail < m_cap) { + if (new_tail < m_hwm) { return new_tail; - } + } m_tail.fetch_add(-1); return -1; } + size_t to_idx(size_t i) { + return (m_head + i) % m_cap; + } + size_t m_cap; - size_t m_tombstone_cap; + + size_t m_lwm; + size_t m_hwm; + + alignas(64) std::atomic<size_t> m_tail; + alignas(64) std::atomic<size_t> m_head; Wrapped<R>* m_data; - Wrapped<R>* m_sorted_data; psudb::BloomFilter<R>* m_tombstone_filter; alignas(64) std::atomic<size_t> m_tombstonecnt; - alignas(64) std::atomic<uint32_t> m_reccnt; - alignas(64) std::atomic<int64_t> m_tail; - alignas(64) std::atomic<double> m_weight; - alignas(64) std::atomic<double> m_max_weight; - alignas(64) std::atomic<size_t> m_refcnt; }; |