From cf178ae74a76b780b655a447531d2114f9f81d98 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Mon, 15 Jan 2024 14:01:36 -0500 Subject: Various single-threaded bug fixes --- include/framework/DynamicExtension.h | 12 +++++------- include/framework/scheduling/Epoch.h | 14 ++++++++++++++ include/framework/structure/BufferView.h | 18 ++++++++++++++---- include/framework/structure/MutableBuffer.h | 26 ++++++++++++++++++++++---- 4 files changed, 55 insertions(+), 15 deletions(-) diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index c97b390..bddc950 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -90,8 +90,8 @@ public: * ordering than simply accessing the buffer directly, but is * not *strictly* necessary. */ - auto view = m_buffer->get_buffer_view(); if constexpr (D == DeletePolicy::TAGGING) { + auto view = m_buffer->get_buffer_view(); static_assert(std::same_as, "Tagging is only supported in single-threaded operation"); if (get_active_epoch()->get_structure()->tagged_delete(rec)) { return 1; @@ -426,9 +426,8 @@ private: Structure *vers = args->epoch->get_structure(); - // FIXME: with an improved shard interface, multiple full buffer_viewers // could be flushed at once here. - auto buffer_view = args->epoch->get_buffer(); + auto buffer_view = args->epoch->get_flush_buffer(); size_t new_head = buffer_view.get_tail(); for (ssize_t i=0; imerges.size(); i++) { @@ -464,6 +463,8 @@ private: if (!args->compaction) { ((DynamicExtension *) args->extension)->advance_epoch(); } + + ((DynamicExtension *) args->extension)->m_reconstruction_scheduled = false; delete args; } @@ -525,12 +526,9 @@ private: */ epoch->start_job(); - // FIXME: all full buffers can be flushed at this point--but that requires - // retooling the shard interface a bit to do efficiently. - // ReconstructionArgs *args = new ReconstructionArgs(); args->epoch = epoch; - args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffer().get_record_count()); + args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_low_watermark()); args->extension = this; args->compaction = false; /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index ca85fe2..b005ff6 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -60,6 +60,16 @@ public: } } + + /* + * Epochs are *not* copyable or movable. Only one can exist, and all users of + * it work with pointers + */ + Epoch(const Epoch&) = delete; + Epoch(Epoch&&) = delete; + Epoch &operator=(const Epoch&) = delete; + Epoch &operator=(Epoch&&) = delete; + void start_job() { m_active_jobs.fetch_add(1); } @@ -90,6 +100,10 @@ public: return m_buffer->get_buffer_view(); } + BufView get_flush_buffer() { + return m_buffer->get_flush_buffer_view(); + } + /* * Returns a new Epoch object that is a copy of this one. The new object will also contain diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index ba5e693..c751786 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -11,6 +11,7 @@ #include #include #include +#include #include "psu-util/alignment.h" #include "psu-ds/BloomFilter.h" @@ -39,7 +40,8 @@ public: , m_tail(std::exchange(other.m_tail, 0)) , m_cap(std::exchange(other.m_cap, 0)) , m_approx_ts_cnt(std::exchange(other.m_approx_ts_cnt, 0)) - , m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr)) {} + , m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr)) + , m_active(std::exchange(other.m_active, false)) {} BufferView &operator=(BufferView &&other) = delete; @@ -52,10 +54,13 @@ public: , m_tail(tail) , m_cap(cap) , m_approx_ts_cnt(tombstone_cnt) - , m_tombstone_filter(filter) {} + , m_tombstone_filter(filter) + , m_active(true) {} ~BufferView() { - m_release(); + if (m_active) { + m_release(); + } } bool check_tombstone(const R& rec) { @@ -100,13 +105,17 @@ public: } void copy_to_buffer(psudb::byte *buffer) { - memcpy(buffer, (std::byte*) (m_data + m_head), get_record_count() * sizeof(Wrapped)); + memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped)); } size_t get_tail() { return m_tail; } + size_t get_head() { + return m_head; + } + private: Wrapped* m_data; ReleaseFunction m_release; @@ -115,6 +124,7 @@ private: size_t m_cap; size_t m_approx_ts_cnt; psudb::BloomFilter *m_tombstone_filter; + bool m_active; size_t to_idx(size_t i) { return (m_head + i) % m_cap; diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 3a06f0d..5b655fc 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -44,7 +44,7 @@ public: , m_tail(0) , m_head(0) , m_head_refcnt(0) - , m_old_head(0) + , m_old_head(high_watermark) , m_old_head_refcnt(0) , m_data((Wrapped *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped))) , m_tombstone_filter(new psudb::BloomFilter(BF_FPR, m_hwm, BF_HASH_FUNCS)) @@ -62,14 +62,18 @@ public: } int append(const R &rec, bool tombstone=false) { - int32_t pos = 0; - if ((pos = try_advance_tail()) == -1) return 0; + int32_t tail = 0; + if ((tail = try_advance_tail()) == -1) { + return 0; + } Wrapped wrec; wrec.rec = rec; wrec.header = 0; if (tombstone) wrec.set_tombstone(); + size_t pos = tail % m_cap; + m_data[pos] = wrec; m_data[pos].header |= (pos << 2); @@ -131,6 +135,13 @@ public: return BufferView(m_data, m_cap, m_head.load(), m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); } + BufferView get_flush_buffer_view() { + m_head_refcnt.fetch_add(1); + auto f = std::bind(release_head_reference, (void *) this, m_head.load()); + return BufferView(m_data, m_cap, m_head.load(), m_head.load() + m_lwm, m_tscnt.load(), m_tombstone_filter, f); + + } + /* * Advance the buffer following a reconstruction. Move current * head and head_refcnt into old_head and old_head_refcnt, then @@ -142,6 +153,7 @@ public: /* refuse to advance head while there is an old with one references */ if (m_old_head_refcnt > 0) { + fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts"); return false; } @@ -195,6 +207,10 @@ public: * Note: this returns the available physical storage capacity, * *not* now many more records can be inserted before the * HWM is reached. + * + * FIXME: this logic is incorrect for the buffer prior to the + * first call to advance_head, and will under-report the available + * space. */ size_t get_available_capacity() { return m_cap - (m_tail.load() - m_old_head.load()); @@ -205,7 +221,7 @@ private: size_t old_value = m_tail.load(); /* if full, fail to advance the tail */ - if (old_value >= m_hwm) { + if (old_value - m_head.load() >= m_hwm) { return -1; } @@ -236,6 +252,7 @@ private: * also match. */ if (head == buffer->m_old_head.load()) { + assert(buffer->m_old_head_refcnt > 0); buffer->m_old_head_refcnt.fetch_sub(1); /* * if the old head refcnt drops to 0, free @@ -251,6 +268,7 @@ private: buffer->m_old_head.store(buffer->m_head); } } else if (head == buffer->m_head.load()) { + assert(buffer->m_head_refcnt > 0); buffer->m_head_refcnt.fetch_sub(1); } } -- cgit v1.2.3