summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-10 18:01:30 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-10 18:01:30 -0500
commiteb19677340be6f0befe9da2199e5832af51eea0d (patch)
treef922615b68b04edb61e0cecfad3c51e61a8a8fdb
parent53879a0d69f5e578710b7125e9b41e516c2371d4 (diff)
downloaddynamic-extension-eb19677340be6f0befe9da2199e5832af51eea0d.tar.gz
MutableBuffer: multithreaded insert test + bugfixes
-rw-r--r--include/framework/structure/MutableBuffer.h13
-rw-r--r--tests/mutable_buffer_tests.cpp32
2 files changed, 29 insertions, 16 deletions
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index d57ad6e..a065154 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -49,7 +49,6 @@ public:
delete m_tombstone_filter;
}
- template <typename R_ = R>
int append(const R &rec, bool tombstone=false) {
int32_t pos = 0;
if ((pos = try_advance_tail()) == -1) return 0;
@@ -91,7 +90,7 @@ public:
}
bool is_at_low_watermark() {
- return (m_tail % m_cap) > m_lwm;
+ return get_record_count() >= m_lwm;
}
size_t get_tombstone_count() {
@@ -139,10 +138,14 @@ public:
* head and head_refcnt into old_head and old_head_refcnt, then
* assign new_head to old_head.
*/
- void advance_head(size_t new_head) {
+ bool advance_head(size_t new_head) {
assert(new_head > m_head.load());
assert(new_head <= m_tail.load());
- assert(m_old_head_refcnt == 0);
+
+ /* refuse to advance head while there is an old with one references */
+ if (m_old_head_refcnt > 0) {
+ return false;
+ }
/*
* the order here is very important. We first store zero to the
@@ -159,6 +162,8 @@ public:
m_head_refcnt.store(0);
m_head.store(new_head);
+
+ return true;
}
void set_low_watermark(size_t lwm) {
diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp
index e714d3e..0520097 100644
--- a/tests/mutable_buffer_tests.cpp
+++ b/tests/mutable_buffer_tests.cpp
@@ -12,15 +12,12 @@
#include <thread>
#include <vector>
-#include <algorithm>
#include "testing.h"
#include "framework/structure/MutableBuffer.h"
#include <check.h>
-#define DE_MT_TEST 0
-
using namespace de;
START_TEST(t_create)
@@ -60,9 +57,9 @@ START_TEST(t_insert)
/* insert records up to the low watermark */
size_t cnt = 0;
for (size_t i=0; i<50; i++) {
+ ck_assert_int_eq(buffer->is_at_low_watermark(), false);
ck_assert_int_eq(buffer->append(rec), 1);
ck_assert_int_eq(buffer->check_tombstone(rec), 0);
- ck_assert_int_eq(buffer->is_at_low_watermark(), false);
rec.key++;
rec.value++;
@@ -73,6 +70,8 @@ START_TEST(t_insert)
ck_assert_int_eq(buffer->get_tail(), cnt);
}
+ ck_assert_int_eq(buffer->is_at_low_watermark(), true);
+
/* insert records up to the high watermark */
for (size_t i=0; i<50; i++) {
ck_assert_int_eq(buffer->is_full(), 0);
@@ -118,7 +117,7 @@ START_TEST(t_advance_head)
cnt++;
if (buffer->is_at_low_watermark() && new_head == 0) {
- new_head = buffer->get_tail() - 1;
+ new_head = buffer->get_tail();
}
}
@@ -132,30 +131,38 @@ START_TEST(t_advance_head)
view.copy_to_buffer((psudb::byte *) view_records);
/* advance the head */
- buffer->advance_head(new_head);
+ ck_assert_int_eq(buffer->advance_head(new_head), 1);
ck_assert_int_eq(buffer->get_record_count(), 25);
ck_assert_int_eq(buffer->get_buffer_view().get_record_count(), 25);
ck_assert_int_eq(view.get_record_count(), cnt);
ck_assert_int_eq(buffer->get_available_capacity(), 200 - cnt);
+
+ /* refuse to advance head again while there remain references to the old one */
+ ck_assert_int_eq(buffer->advance_head(buffer->get_tail() -1), 0);
}
/* once the buffer view falls out of scope, the capacity of the buffer should increase */
ck_assert_int_eq(buffer->get_available_capacity(), 175);
+ /* now the head should be able to be advanced */
+ ck_assert_int_eq(buffer->advance_head(buffer->get_tail()), 1);
+
+ /* and the buffer should be empty */
+ ck_assert_int_eq(buffer->get_record_count(), 0);
+
delete buffer;
delete[] view_records;
}
END_TEST
-void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t start, size_t stop, MutableBuffer<Rec> *buffer)
+void insert_records(std::vector<Rec> *values, size_t start, size_t stop, MutableBuffer<Rec> *buffer)
{
for (size_t i=start; i<stop; i++) {
- buffer->append({(*values)[i].first, (*values)[i].second});
+ buffer->append((*values)[i]);
}
}
-/*
START_TEST(t_multithreaded_insert)
{
size_t cnt = 10000;
@@ -166,7 +173,7 @@ START_TEST(t_multithreaded_insert)
records[i] = Rec {(uint64_t) rand(), (uint32_t) rand()};
}
- // perform a multithreaded insertion
+ /* perform a multithreaded insertion */
size_t thread_cnt = 8;
size_t per_thread = cnt / thread_cnt;
std::vector<std::thread> workers(thread_cnt);
@@ -186,9 +193,10 @@ START_TEST(t_multithreaded_insert)
ck_assert_int_eq(buffer->is_full(), 1);
ck_assert_int_eq(buffer->get_record_count(), cnt);
+
+ delete buffer;
}
END_TEST
-*/
START_TEST(t_truncate)
@@ -243,7 +251,7 @@ Suite *unit_testing()
TCase *append = tcase_create("de::MutableBuffer::append Testing");
tcase_add_test(append, t_insert);
tcase_add_test(append, t_advance_head);
- //tcase_add_test(append, t_multithreaded_insert);
+ tcase_add_test(append, t_multithreaded_insert);
suite_add_tcase(unit, append);