summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-01-15 14:01:36 -0500
committerDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-01-15 14:01:36 -0500
commitcf178ae74a76b780b655a447531d2114f9f81d98 (patch)
treec992a3209a650bb90540dd4449e1d8111f216458 /include
parentaac0bb661af8fae38d3ce08d6078cb4d9dfcb575 (diff)
downloaddynamic-extension-cf178ae74a76b780b655a447531d2114f9f81d98.tar.gz
Various single-threaded bug fixes
Diffstat (limited to 'include')
-rw-r--r--include/framework/DynamicExtension.h12
-rw-r--r--include/framework/scheduling/Epoch.h14
-rw-r--r--include/framework/structure/BufferView.h18
-rw-r--r--include/framework/structure/MutableBuffer.h26
4 files changed, 55 insertions, 15 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index c97b390..bddc950 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -90,8 +90,8 @@ public:
* ordering than simply accessing the buffer directly, but is
* not *strictly* necessary.
*/
- auto view = m_buffer->get_buffer_view();
if constexpr (D == DeletePolicy::TAGGING) {
+ auto view = m_buffer->get_buffer_view();
static_assert(std::same_as<SCHED, SerialScheduler>, "Tagging is only supported in single-threaded operation");
if (get_active_epoch()->get_structure()->tagged_delete(rec)) {
return 1;
@@ -426,9 +426,8 @@ private:
Structure *vers = args->epoch->get_structure();
- // FIXME: with an improved shard interface, multiple full buffer_viewers
// could be flushed at once here.
- auto buffer_view = args->epoch->get_buffer();
+ auto buffer_view = args->epoch->get_flush_buffer();
size_t new_head = buffer_view.get_tail();
for (ssize_t i=0; i<args->merges.size(); i++) {
@@ -464,6 +463,8 @@ private:
if (!args->compaction) {
((DynamicExtension *) args->extension)->advance_epoch();
}
+
+ ((DynamicExtension *) args->extension)->m_reconstruction_scheduled = false;
delete args;
}
@@ -525,12 +526,9 @@ private:
*/
epoch->start_job();
- // FIXME: all full buffers can be flushed at this point--but that requires
- // retooling the shard interface a bit to do efficiently.
- //
ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>();
args->epoch = epoch;
- args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffer().get_record_count());
+ args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_low_watermark());
args->extension = this;
args->compaction = false;
/* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index ca85fe2..b005ff6 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -60,6 +60,16 @@ public:
}
}
+
+ /*
+ * Epochs are *not* copyable or movable. Only one can exist, and all users of
+ * it work with pointers
+ */
+ Epoch(const Epoch&) = delete;
+ Epoch(Epoch&&) = delete;
+ Epoch &operator=(const Epoch&) = delete;
+ Epoch &operator=(Epoch&&) = delete;
+
void start_job() {
m_active_jobs.fetch_add(1);
}
@@ -90,6 +100,10 @@ public:
return m_buffer->get_buffer_view();
}
+ BufView get_flush_buffer() {
+ return m_buffer->get_flush_buffer_view();
+ }
+
/*
* Returns a new Epoch object that is a copy of this one. The new object will also contain
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index ba5e693..c751786 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -11,6 +11,7 @@
#include <cstdlib>
#include <cassert>
#include <functional>
+#include <utility>
#include "psu-util/alignment.h"
#include "psu-ds/BloomFilter.h"
@@ -39,7 +40,8 @@ public:
, m_tail(std::exchange(other.m_tail, 0))
, m_cap(std::exchange(other.m_cap, 0))
, m_approx_ts_cnt(std::exchange(other.m_approx_ts_cnt, 0))
- , m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr)) {}
+ , m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr))
+ , m_active(std::exchange(other.m_active, false)) {}
BufferView &operator=(BufferView &&other) = delete;
@@ -52,10 +54,13 @@ public:
, m_tail(tail)
, m_cap(cap)
, m_approx_ts_cnt(tombstone_cnt)
- , m_tombstone_filter(filter) {}
+ , m_tombstone_filter(filter)
+ , m_active(true) {}
~BufferView() {
- m_release();
+ if (m_active) {
+ m_release();
+ }
}
bool check_tombstone(const R& rec) {
@@ -100,13 +105,17 @@ public:
}
void copy_to_buffer(psudb::byte *buffer) {
- memcpy(buffer, (std::byte*) (m_data + m_head), get_record_count() * sizeof(Wrapped<R>));
+ memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped<R>));
}
size_t get_tail() {
return m_tail;
}
+ size_t get_head() {
+ return m_head;
+ }
+
private:
Wrapped<R>* m_data;
ReleaseFunction m_release;
@@ -115,6 +124,7 @@ private:
size_t m_cap;
size_t m_approx_ts_cnt;
psudb::BloomFilter<R> *m_tombstone_filter;
+ bool m_active;
size_t to_idx(size_t i) {
return (m_head + i) % m_cap;
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 3a06f0d..5b655fc 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -44,7 +44,7 @@ public:
, m_tail(0)
, m_head(0)
, m_head_refcnt(0)
- , m_old_head(0)
+ , m_old_head(high_watermark)
, m_old_head_refcnt(0)
, m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)))
, m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS))
@@ -62,14 +62,18 @@ public:
}
int append(const R &rec, bool tombstone=false) {
- int32_t pos = 0;
- if ((pos = try_advance_tail()) == -1) return 0;
+ int32_t tail = 0;
+ if ((tail = try_advance_tail()) == -1) {
+ return 0;
+ }
Wrapped<R> wrec;
wrec.rec = rec;
wrec.header = 0;
if (tombstone) wrec.set_tombstone();
+ size_t pos = tail % m_cap;
+
m_data[pos] = wrec;
m_data[pos].header |= (pos << 2);
@@ -131,6 +135,13 @@ public:
return BufferView<R>(m_data, m_cap, m_head.load(), m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
}
+ BufferView<R> get_flush_buffer_view() {
+ m_head_refcnt.fetch_add(1);
+ auto f = std::bind(release_head_reference, (void *) this, m_head.load());
+ return BufferView<R>(m_data, m_cap, m_head.load(), m_head.load() + m_lwm, m_tscnt.load(), m_tombstone_filter, f);
+
+ }
+
/*
* Advance the buffer following a reconstruction. Move current
* head and head_refcnt into old_head and old_head_refcnt, then
@@ -142,6 +153,7 @@ public:
/* refuse to advance head while there is an old with one references */
if (m_old_head_refcnt > 0) {
+ fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts");
return false;
}
@@ -195,6 +207,10 @@ public:
* Note: this returns the available physical storage capacity,
* *not* now many more records can be inserted before the
* HWM is reached.
+ *
+ * FIXME: this logic is incorrect for the buffer prior to the
+ * first call to advance_head, and will under-report the available
+ * space.
*/
size_t get_available_capacity() {
return m_cap - (m_tail.load() - m_old_head.load());
@@ -205,7 +221,7 @@ private:
size_t old_value = m_tail.load();
/* if full, fail to advance the tail */
- if (old_value >= m_hwm) {
+ if (old_value - m_head.load() >= m_hwm) {
return -1;
}
@@ -236,6 +252,7 @@ private:
* also match.
*/
if (head == buffer->m_old_head.load()) {
+ assert(buffer->m_old_head_refcnt > 0);
buffer->m_old_head_refcnt.fetch_sub(1);
/*
* if the old head refcnt drops to 0, free
@@ -251,6 +268,7 @@ private:
buffer->m_old_head.store(buffer->m_head);
}
} else if (head == buffer->m_head.load()) {
+ assert(buffer->m_head_refcnt > 0);
buffer->m_head_refcnt.fetch_sub(1);
}
}