summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-10 17:39:28 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-10 17:39:28 -0500
commit53879a0d69f5e578710b7125e9b41e516c2371d4 (patch)
treee036c77f11f07c0823eb7db9bce52269fdac6312
parent3c2e6b3b456867d7155b158432b891b84e4e1dd6 (diff)
downloaddynamic-extension-53879a0d69f5e578710b7125e9b41e516c2371d4.tar.gz
MutableBuffer+View: Implementation with unit tests
-rw-r--r--include/framework/structure/BufferView.h32
-rw-r--r--include/framework/structure/MutableBuffer.h133
-rw-r--r--tests/mutable_buffer_tests.cpp254
3 files changed, 229 insertions, 190 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index 8a5f50f..7e8af45 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -9,32 +9,34 @@
#pragma once
#include <cstdlib>
-#include <atomic>
-#include <condition_variable>
#include <cassert>
-#include <numeric>
-#include <algorithm>
-#include <type_traits>
+#include <functional>
#include "psu-util/alignment.h"
-#include "util/bf_config.h"
#include "psu-ds/BloomFilter.h"
-#include "psu-ds/Alias.h"
-#include "psu-util/timer.h"
#include "framework/interface/Record.h"
-#include "framework/interface/Query.h"
namespace de {
+typedef std::function<void(void*, size_t)> ReleaseFunction;
+
template <RecordInterface R>
class BufferView {
public:
BufferView() = default;
- BufferView(const Wrapped<R> *buffer, size_t head, size_t tail, psudb::BloomFilter<R> *filter)
- : m_buffer(buffer), m_head(head), m_tail(tail), m_tombstone_filter(filter) {}
+ BufferView(const Wrapped<R> *buffer, size_t head, size_t tail, psudb::BloomFilter<R> *filter,
+ void *parent_buffer, ReleaseFunction release)
+ : m_buffer(buffer)
+ , m_release(release)
+ , m_parent_buffer(parent_buffer)
+ , m_head(head)
+ , m_tail(tail)
+ , m_tombstone_filter(filter) {}
- ~BufferView() = default;
+ ~BufferView() {
+ m_release(m_parent_buffer, m_head);
+ }
bool check_tombstone(const R& rec) {
if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false;
@@ -62,12 +64,14 @@ public:
return m_buffer + to_idx(i);
}
- void copy_to_buffer(byte *buffer) {
- memcpy(buffer, m_buffer, get_record_count() * sizeof(Wrapped<R>));
+ void copy_to_buffer(psudb::byte *buffer) {
+ memcpy(buffer, (std::byte*) (m_buffer + m_head), get_record_count() * sizeof(Wrapped<R>));
}
private:
const Wrapped<R>* m_buffer;
+ void *m_parent_buffer;
+ ReleaseFunction m_release;
size_t m_head;
size_t m_tail;
psudb::BloomFilter<R> *m_tombstone_filter;
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 7bec219..d57ad6e 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -11,17 +11,11 @@
#include <cstdlib>
#include <atomic>
-#include <condition_variable>
#include <cassert>
-#include <numeric>
-#include <algorithm>
-#include <type_traits>
#include "psu-util/alignment.h"
#include "util/bf_config.h"
#include "psu-ds/BloomFilter.h"
-#include "psu-ds/Alias.h"
-#include "psu-util/timer.h"
#include "framework/interface/Record.h"
#include "framework/structure/BufferView.h"
@@ -34,7 +28,8 @@ class MutableBuffer {
friend class BufferView<R>;
public:
MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0)
- : m_lwm(low_watermark), m_hwm(high_watermark), m_cap(capacity), m_head(0), m_tail(0) {
+ : m_lwm(low_watermark), m_hwm(high_watermark), m_cap(capacity), m_head(0), m_tail(0),
+ m_old_head(0), m_head_refcnt(0), m_old_head_refcnt(0) {
/*
* default capacity is twice the high water mark, to account for the worst-case
* memory requirements.
@@ -50,10 +45,8 @@ public:
}
~MutableBuffer() {
- assert(m_refcnt.load() == 0);
-
- if (m_data) free(m_data);
- if (m_tombstone_filter) delete m_tombstone_filter;
+ free(m_data);
+ delete m_tombstone_filter;
}
template <typename R_ = R>
@@ -94,7 +87,11 @@ public:
}
bool is_full() {
- return (m_tail % m_cap) >= m_hwm;
+ return get_record_count() >= m_hwm;
+ }
+
+ bool is_at_low_watermark() {
+ return (m_tail % m_cap) > m_lwm;
}
size_t get_tombstone_count() {
@@ -125,66 +122,132 @@ public:
}
size_t get_memory_usage() {
- return m_cap * sizeof(R);
+ return m_cap * sizeof(Wrapped<R>);
}
size_t get_aux_memory_usage() {
return m_tombstone_filter->get_memory_usage();
}
- size_t get_tombstone_capacity() {
- // FIXME: tombstone capacity needs figured out again
- return m_cap;
+ BufferView<R> get_buffer_view() {
+ m_head_refcnt.fetch_add(1);
+ return BufferView(m_data, m_head, m_tail.load(), m_tombstone_filter, (void*) this, release_head_reference);
}
/*
- * Concurrency-related operations
+ * Advance the buffer following a reconstruction. Move current
+ * head and head_refcnt into old_head and old_head_refcnt, then
+ * assign new_head to old_head.
*/
- bool take_reference() {
- m_refcnt.fetch_add(1);
- return true;
+ void advance_head(size_t new_head) {
+ assert(new_head > m_head.load());
+ assert(new_head <= m_tail.load());
+ assert(m_old_head_refcnt == 0);
+
+ /*
+ * the order here is very important. We first store zero to the
+ * old_refcnt (should be zero anyway). Then we move the current
+ * head to old head. At this point, any new buffer views should
+ * increment the old head refcnt, so no new references to the
+ * current head will be taken. Then we add the current head
+ * refcnt to this. This is to ensure that no references get
+ * dropped. Only after this do we change to the new head
+ */
+ m_old_head_refcnt.store(0);
+ m_old_head.store(m_head.load());
+ m_old_head_refcnt.fetch_add(m_head_refcnt);
+
+ m_head_refcnt.store(0);
+ m_head.store(new_head);
}
- bool release_reference() {
- assert(m_refcnt > 0);
- m_refcnt.fetch_add(-1);
- return true;
+ void set_low_watermark(size_t lwm) {
+ assert(lwm < m_hwm);
+ m_lwm = lwm;
}
- size_t get_reference_count() {
- return m_refcnt.load();
+ size_t get_low_watermark() {
+ return m_lwm;
+ }
+
+ void set_high_watermark(size_t hwm) {
+ assert(hwm > m_lwm);
+ assert(hwm < m_cap);
+ m_hwm = hwm;
+ }
+
+ size_t get_high_watermark() {
+ return m_hwm;
+ }
+
+ size_t get_tail() {
+ return m_tail.load();
+ }
+
+ /*
+ * Note: this returns the available physical storage capacity,
+ * *not* now many more records can be inserted before the
+ * HWM is reached.
+ */
+ size_t get_available_capacity() {
+ return m_cap - (m_tail.load() - m_old_head.load());
}
private:
int64_t try_advance_tail() {
- int64_t new_tail = m_tail.fetch_add(1) % m_cap;
+ size_t old_value = m_tail.load();
+
+ /* if full, fail to advance the tail */
+ if (old_value >= m_hwm) {
+ return -1;
+ }
- if (new_tail < m_hwm) {
- return new_tail;
+ while (!m_tail.compare_exchange_strong(old_value, old_value+1)) {
+ /* if full, stop trying and fail to advance the tail */
+ if (m_tail.load() >= m_hwm) {
+ return -1;
+ }
}
- m_tail.fetch_add(-1);
- return -1;
+ return old_value;
}
size_t to_idx(size_t i) {
return (m_head + i) % m_cap;
}
- size_t m_cap;
+ static void release_head_reference(void *buff, size_t head) {
+ MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff;
+
+ if (head == buffer->m_head.load()) {
+ buffer->m_head_refcnt.fetch_sub(1);
+ } else if (head == buffer->m_old_head.load()) {
+ buffer->m_old_head_refcnt.fetch_sub(1);
+ /*
+ * if the old head refcnt drops to 0, free
+ * the records by setting old_head = head
+ */
+ if (buffer->m_old_head_refcnt.load() == 0) {
+ buffer->m_old_head.store(buffer->m_head);
+ }
+ }
+ }
size_t m_lwm;
size_t m_hwm;
+ size_t m_cap;
alignas(64) std::atomic<size_t> m_tail;
+
alignas(64) std::atomic<size_t> m_head;
+ alignas(64) std::atomic<size_t> m_head_refcnt;
+
+ alignas(64) std::atomic<size_t> m_old_head;
+ alignas(64) std::atomic<size_t> m_old_head_refcnt;
Wrapped<R>* m_data;
-
psudb::BloomFilter<R>* m_tombstone_filter;
-
alignas(64) std::atomic<size_t> m_tombstonecnt;
- alignas(64) std::atomic<size_t> m_refcnt;
};
}
diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp
index 8480f55..e714d3e 100644
--- a/tests/mutable_buffer_tests.cpp
+++ b/tests/mutable_buffer_tests.cpp
@@ -1,7 +1,7 @@
/*
* tests/mutable_buffer_tests.cpp
*
- * Unit tests for MutableBuffer
+ * Unit tests for MutableBuffer and BufferView
*
* Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
@@ -9,7 +9,7 @@
* Distributed under the Modified BSD License.
*
*/
-#include <string>
+
#include <thread>
#include <vector>
#include <algorithm>
@@ -25,15 +25,26 @@ using namespace de;
START_TEST(t_create)
{
- auto buffer = new MutableBuffer<Rec>(100, 50);
+ size_t lwm = 50, hwm = 100;
+ size_t cap = 2 * hwm;
+
+ auto buffer = new MutableBuffer<Rec>(lwm, hwm);
ck_assert_ptr_nonnull(buffer);
- ck_assert_int_eq(buffer->get_capacity(), 100);
- ck_assert_int_eq(buffer->get_record_count(), 0);
+ ck_assert_int_eq(buffer->get_capacity(), cap);
+ ck_assert_int_eq(buffer->get_low_watermark(), lwm);
+ ck_assert_int_eq(buffer->get_high_watermark(), hwm);
+
ck_assert_int_eq(buffer->is_full(), false);
- ck_assert_ptr_nonnull(buffer->get_data());
+ ck_assert_int_eq(buffer->is_at_low_watermark(), false);
+ ck_assert_int_eq(buffer->get_record_count(), 0);
ck_assert_int_eq(buffer->get_tombstone_count(), 0);
- ck_assert_int_eq(buffer->get_tombstone_capacity(), 50);
+
+ {
+ auto view = buffer->get_buffer_view();
+ ck_assert_int_eq(view.get_tombstone_count(), 0);
+ ck_assert_int_eq(view.get_record_count(), 0);
+ }
delete buffer;
}
@@ -42,74 +53,47 @@ END_TEST
START_TEST(t_insert)
{
- auto buffer = new MutableBuffer<WRec>(100, 50);
+ auto buffer = new MutableBuffer<Rec>(50, 100);
- uint64_t key = 0;
- uint32_t val = 5;
+ Rec rec = {0, 5, 1};
- WRec rec = {0, 5, 1};
-
- for (size_t i=0; i<99; i++) {
+ /* insert records up to the low watermark */
+ size_t cnt = 0;
+ for (size_t i=0; i<50; i++) {
ck_assert_int_eq(buffer->append(rec), 1);
ck_assert_int_eq(buffer->check_tombstone(rec), 0);
+ ck_assert_int_eq(buffer->is_at_low_watermark(), false);
rec.key++;
rec.value++;
+ cnt++;
- ck_assert_int_eq(buffer->get_record_count(), i+1);
- ck_assert_int_eq(buffer->get_tombstone_count(), 0);
- ck_assert_int_eq(buffer->is_full(), 0);
+ ck_assert_int_eq(buffer->get_record_count(), cnt);
+ ck_assert_int_eq(buffer->get_buffer_view().get_record_count(), cnt);
+ ck_assert_int_eq(buffer->get_tail(), cnt);
}
- ck_assert_int_eq(buffer->append(rec), 1);
-
- rec.key++;
- rec.value++;
-
- ck_assert_int_eq(buffer->is_full(), 1);
- ck_assert_int_eq(buffer->append(rec), 0);
-
- delete buffer;
-
-}
-END_TEST
-
-
-START_TEST(t_insert_tombstones)
-{
- auto buffer = new MutableBuffer<Rec>(100, 50);
-
- size_t ts_cnt = 0;
-
- Rec rec = {0, 5};
-
- for (size_t i=0; i<99; i++) {
- bool ts = false;
- if (i % 2 == 0) {
- ts_cnt++;
- ts=true;
- }
-
- ck_assert_int_eq(buffer->append(rec, ts), 1);
- ck_assert_int_eq(buffer->check_tombstone(rec), ts);
+ /* insert records up to the high watermark */
+ for (size_t i=0; i<50; i++) {
+ ck_assert_int_eq(buffer->is_full(), 0);
+ ck_assert_int_eq(buffer->append(rec), 1);
+ ck_assert_int_eq(buffer->check_tombstone(rec), 0);
rec.key++;
rec.value++;
+ cnt++;
- ck_assert_int_eq(buffer->get_record_count(), i+1);
- ck_assert_int_eq(buffer->get_tombstone_count(), ts_cnt);
- ck_assert_int_eq(buffer->is_full(), 0);
- }
-
- // inserting one more tombstone should not be possible
- ck_assert_int_eq(buffer->append(rec, true), 0);
+ ck_assert_int_eq(buffer->get_record_count(), cnt);
+ ck_assert_int_eq(buffer->get_buffer_view().get_record_count(), cnt);
+ ck_assert_int_eq(buffer->get_tombstone_count(), 0);
+ ck_assert_int_eq(buffer->is_at_low_watermark(), true);
+ ck_assert_int_eq(buffer->get_tail(), cnt);
+ }
- ck_assert_int_eq(buffer->append(rec), 1);
-
+ /* further inserts should fail */
rec.key++;
rec.value++;
-
ck_assert_int_eq(buffer->is_full(), 1);
ck_assert_int_eq(buffer->append(rec), 0);
@@ -118,88 +102,51 @@ START_TEST(t_insert_tombstones)
END_TEST
-START_TEST(t_truncate)
+START_TEST(t_advance_head)
{
- auto buffer = new MutableBuffer<Rec>(100, 100);
+ auto buffer = new MutableBuffer<Rec>(50, 100);
- size_t ts_cnt = 0;
- Rec rec = {0, 5};
-
- for (size_t i=0; i<100; i++) {
- bool ts = false;
- if (i % 2 == 0) {
- ts_cnt++;
- ts=true;
- }
-
- ck_assert_int_eq(buffer->append(rec, ts), 1);
- ck_assert_int_eq(buffer->check_tombstone(rec), ts);
+ /* insert 75 records and get tail when LWM is exceeded */
+ size_t new_head = 0;
+ Rec rec = {1, 1};
+ size_t cnt = 0;
+ for (size_t i=0; i<75; i++) {
+ ck_assert_int_eq(buffer->append(rec), 1);
rec.key++;
rec.value++;
+ cnt++;
- ck_assert_int_eq(buffer->get_record_count(), i+1);
- ck_assert_int_eq(buffer->get_tombstone_count(), ts_cnt);
- }
-
- ck_assert_int_eq(buffer->is_full(), 1);
- ck_assert_int_eq(buffer->append(rec), 0);
-
- ck_assert_int_eq(buffer->truncate(), 1);
-
- ck_assert_int_eq(buffer->is_full(), 0);
- ck_assert_int_eq(buffer->get_record_count(), 0);
- ck_assert_int_eq(buffer->get_tombstone_count(), 0);
- ck_assert_int_eq(buffer->append(rec), 1);
-
- delete buffer;
-
-}
-END_TEST
-
-
-START_TEST(t_get_data)
-{
- size_t cnt = 100;
-
- auto buffer = new MutableBuffer<Rec>(cnt, cnt/2);
-
-
- std::vector<uint64_t> keys(cnt);
- for (size_t i=0; i<cnt-2; i++) {
- keys[i] = rand();
+ if (buffer->is_at_low_watermark() && new_head == 0) {
+ new_head = buffer->get_tail() - 1;
+ }
}
- // duplicate final two records for tombstone testing
- // purposes
- keys[cnt-2] = keys[cnt-3];
- keys[cnt-1] = keys[cnt-2];
-
- uint32_t val = 12345;
- for (size_t i=0; i<cnt-2; i++) {
- buffer->append(Rec {keys[i], val});
+ ck_assert_int_eq(buffer->get_available_capacity(), 200 - cnt);
+
+ Wrapped<Rec> *view_records = new Wrapped<Rec>[buffer->get_record_count()];
+ {
+ /* get a view of the pre-advanced state */
+ auto view = buffer->get_buffer_view();
+ ck_assert_int_eq(view.get_record_count(), cnt);
+ view.copy_to_buffer((psudb::byte *) view_records);
+
+ /* advance the head */
+ buffer->advance_head(new_head);
+ ck_assert_int_eq(buffer->get_record_count(), 25);
+ ck_assert_int_eq(buffer->get_buffer_view().get_record_count(), 25);
+ ck_assert_int_eq(view.get_record_count(), cnt);
+ ck_assert_int_eq(buffer->get_available_capacity(), 200 - cnt);
}
- Rec r1 = {keys[cnt-2], val};
- buffer->append(r1, true);
-
- Rec r2 = {keys[cnt-1], val};
- buffer->append(r2, true);
-
-
- auto *sorted_records = buffer->get_data();
- std::sort(keys.begin(), keys.end());
- std::sort(sorted_records, sorted_records + buffer->get_record_count(), std::less<Wrapped<Rec>>());
-
- for (size_t i=0; i<cnt; i++) {
- ck_assert_int_eq(sorted_records[i].rec.key, keys[i]);
- }
+ /* once the buffer view falls out of scope, the capacity of the buffer should increase */
+ ck_assert_int_eq(buffer->get_available_capacity(), 175);
delete buffer;
+ delete[] view_records;
}
END_TEST
-
void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t start, size_t stop, MutableBuffer<Rec> *buffer)
{
for (size_t i=start; i<stop; i++) {
@@ -208,18 +155,18 @@ void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t s
}
-#if DE_MT_TEST
+/*
START_TEST(t_multithreaded_insert)
{
size_t cnt = 10000;
- auto buffer = new MutableBuffer<Rec>(cnt, true, cnt/2);
+ auto buffer = new MutableBuffer<Rec>(cnt/2, cnt);
std::vector<Rec> records(cnt);
for (size_t i=0; i<cnt; i++) {
records[i] = Rec {(uint64_t) rand(), (uint32_t) rand()};
}
- // perform a t_multithreaded insertion
+ // perform a multithreaded insertion
size_t thread_cnt = 8;
size_t per_thread = cnt / thread_cnt;
std::vector<std::thread> workers(thread_cnt);
@@ -239,17 +186,49 @@ START_TEST(t_multithreaded_insert)
ck_assert_int_eq(buffer->is_full(), 1);
ck_assert_int_eq(buffer->get_record_count(), cnt);
+}
+END_TEST
+*/
- std::sort(records.begin(), records.end());
- auto *sorted_records = buffer->sorted_output();
- for (size_t i=0; i<cnt; i++) {
- ck_assert_int_eq(sorted_records[i].key, records[i].key);
+
+START_TEST(t_truncate)
+{
+ auto buffer = new MutableBuffer<Rec>(100, 100);
+
+ size_t ts_cnt = 0;
+ Rec rec = {0, 5};
+
+ for (size_t i=0; i<100; i++) {
+ bool ts = false;
+ if (i % 2 == 0) {
+ ts_cnt++;
+ ts=true;
+ }
+
+ ck_assert_int_eq(buffer->append(rec, ts), 1);
+ ck_assert_int_eq(buffer->check_tombstone(rec), ts);
+
+ rec.key++;
+ rec.value++;
+
+ ck_assert_int_eq(buffer->get_record_count(), i+1);
+ ck_assert_int_eq(buffer->get_tombstone_count(), ts_cnt);
}
+ ck_assert_int_eq(buffer->is_full(), 1);
+ ck_assert_int_eq(buffer->append(rec), 0);
+
+ ck_assert_int_eq(buffer->truncate(), 1);
+
+ ck_assert_int_eq(buffer->is_full(), 0);
+ ck_assert_int_eq(buffer->get_record_count(), 0);
+ ck_assert_int_eq(buffer->get_tombstone_count(), 0);
+ ck_assert_int_eq(buffer->append(rec), 1);
+
delete buffer;
+
}
END_TEST
-#endif
Suite *unit_testing()
@@ -263,10 +242,8 @@ Suite *unit_testing()
TCase *append = tcase_create("de::MutableBuffer::append Testing");
tcase_add_test(append, t_insert);
- tcase_add_test(append, t_insert_tombstones);
- #if DE_MT_TEST
- tcase_add_test(append, t_multithreaded_insert);
- #endif
+ tcase_add_test(append, t_advance_head);
+ //tcase_add_test(append, t_multithreaded_insert);
suite_add_tcase(unit, append);
@@ -277,11 +254,6 @@ Suite *unit_testing()
suite_add_tcase(unit, truncate);
- TCase *sorted_out = tcase_create("de::MutableBuffer::get_data");
- tcase_add_test(sorted_out, t_get_data);
-
- suite_add_tcase(unit, sorted_out);
-
return unit;
}