summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-02-05 15:18:33 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-02-05 15:18:33 -0500
commit0ff3cedf5df9c27bccd3053ce6339e317f87ff76 (patch)
tree0ffdeb674cc31d8b285547e86b6939b0899f2113
parentdb4806d9dd9757273a14e6c3ea92e5a087239145 (diff)
downloaddynamic-extension-0ff3cedf5df9c27bccd3053ce6339e317f87ff76.tar.gz
BufferView: Adjusted BV to avoid repeated modulus operations
-rw-r--r--include/framework/structure/BufferView.h50
-rw-r--r--include/framework/structure/MutableBuffer.h13
-rw-r--r--tests/mutable_buffer_tests.cpp142
3 files changed, 190 insertions, 15 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index 30fffed..edf6707 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -38,6 +38,8 @@ public:
, m_release(std::move(other.m_release))
, m_head(std::exchange(other.m_head, 0))
, m_tail(std::exchange(other.m_tail, 0))
+ , m_start(std::exchange(other.m_start, 0))
+ , m_stop(std::exchange(other.m_stop, 0))
, m_cap(std::exchange(other.m_cap, 0))
, m_approx_ts_cnt(std::exchange(other.m_approx_ts_cnt, 0))
, m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr))
@@ -52,6 +54,8 @@ public:
, m_release(release)
, m_head(head)
, m_tail(tail)
+ , m_start(m_head % cap)
+ , m_stop(m_tail % cap)
, m_cap(cap)
, m_approx_ts_cnt(tombstone_cnt)
, m_tombstone_filter(filter)
@@ -76,11 +80,29 @@ public:
}
bool delete_record(const R& rec) {
- for (size_t i=0; i<get_record_count(); i++) {
- if (m_data[to_idx(i)].rec == rec) {
- m_data[to_idx(i)].set_delete();
- return true;
+ if (m_start < m_stop) {
+ for (size_t i=m_start; i<m_stop; i++) {
+ if (m_data[i].rec == rec) {
+ m_data[i].set_delete();
+ return true;
+ }
}
+ } else {
+ for (size_t i=m_start; i<m_cap; i++) {
+ if (m_data[i].rec == rec) {
+ m_data[i].set_delete();
+ return true;
+ }
+ }
+
+ for (size_t i=0; i<m_stop; i++) {
+ if (m_data[i].rec == rec) {
+ m_data[i].set_delete();
+ return true;
+ }
+
+ }
+
}
return false;
@@ -101,18 +123,19 @@ public:
Wrapped<R> *get(size_t i) {
assert(i < get_record_count());
+ m_total += (m_data + to_idx(i))->rec.key;
return m_data + to_idx(i);
}
void copy_to_buffer(psudb::byte *buffer) {
/* check if the region to be copied circles back to start. If so, do it in two steps */
- if ((m_head % m_cap) + get_record_count() > m_cap) {
- size_t split_idx = m_cap - (m_head % m_cap);
+ if (m_start > m_stop) {
+ size_t split_idx = m_cap - m_start;
- memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), split_idx* sizeof(Wrapped<R>));
- memcpy(buffer + (split_idx * sizeof(Wrapped<R>)), (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped<R>));
+ memcpy(buffer, (std::byte*) (m_data + m_start), split_idx* sizeof(Wrapped<R>));
+ memcpy(buffer + (split_idx * sizeof(Wrapped<R>)), (std::byte*) m_data, m_stop * sizeof(Wrapped<R>));
} else {
- memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped<R>));
+ memcpy(buffer, (std::byte*) (m_data + m_start), get_record_count() * sizeof(Wrapped<R>));
}
}
@@ -129,13 +152,20 @@ private:
ReleaseFunction m_release;
size_t m_head;
size_t m_tail;
+ size_t m_start;
+ size_t m_stop;
size_t m_cap;
size_t m_approx_ts_cnt;
psudb::BloomFilter<R> *m_tombstone_filter;
bool m_active;
+ size_t m_total;
+
size_t to_idx(size_t i) {
- return (m_head + i) % m_cap;
+ size_t idx = (m_start + i >= m_cap) ? i = (m_cap - m_start)
+ : m_start + i;
+ assert(idx < m_cap);
+ return idx;
}
};
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 94a9c41..415c95a 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -230,13 +230,16 @@ public:
/*
* Note: this returns the available physical storage capacity,
* *not* now many more records can be inserted before the
- * HWM is reached.
- *
- * FIXME: this logic is incorrect for the buffer prior to the
- * first call to advance_head, and will under-report the available
- * space.
+ * HWM is reached. It considers the old_head to be "free"
+ * when it has no remaining references. This should be true,
+ * but a buggy framework implementation may violate the
+ * assumption.
*/
size_t get_available_capacity() {
+ if (m_old_head.load().refcnt == 0) {
+ return m_cap - (m_tail.load() - m_head.load().head_idx);
+ }
+
return m_cap - (m_tail.load() - m_old_head.load().head_idx);
}
diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp
index 4064412..31c16dc 100644
--- a/tests/mutable_buffer_tests.cpp
+++ b/tests/mutable_buffer_tests.cpp
@@ -238,6 +238,143 @@ START_TEST(t_truncate)
}
END_TEST
+START_TEST(t_bview_get)
+{
+ auto buffer = new MutableBuffer<Rec>(50, 100);
+
+ /* insert 75 records and get tail when LWM is exceeded */
+ size_t new_head = 0;
+ Rec rec = {1, 1};
+ size_t cnt = 0;
+ for (size_t i=0; i<75; i++) {
+ ck_assert_int_eq(buffer->append(rec), 1);
+
+ rec.key++;
+ rec.value++;
+ cnt++;
+
+ if (buffer->is_at_low_watermark() && new_head == 0) {
+ new_head = buffer->get_tail();
+ }
+ }
+
+ ck_assert_int_eq(buffer->get_available_capacity(), 200 - cnt);
+
+ {
+ /* get a view of the pre-advanced state */
+ auto view = buffer->get_buffer_view();
+ auto reccnt = view.get_record_count();
+
+ /* scan the records in the view */
+ for (size_t i=0; i<reccnt; i++) {
+ ck_assert_int_eq(view.get(i)->rec.key, i+1);
+ }
+
+ /* advance the head */
+ buffer->advance_head(new_head);
+
+ /* scan the records in the view again -- should be unchanged */
+ for (size_t i=0; i<reccnt; i++) {
+ ck_assert_int_eq(view.get(i)->rec.key, i+1);
+ }
+ }
+
+ {
+ /* get a new view (should have fewer records) */
+ auto view = buffer->get_buffer_view();
+ auto reccnt = view.get_record_count();
+
+ /* verify the scan again */
+ for (size_t i=0; i<reccnt; i++) {
+ ck_assert_int_eq(view.get(i)->rec.key, i + 51);
+ }
+ }
+
+ /* insert more records (to trigger a wrap-around) */
+ for (size_t i=0; i<75; i++) {
+ ck_assert_int_eq(buffer->append(rec), 1);
+
+ rec.key++;
+ rec.value++;
+ cnt++;
+ }
+
+ {
+ /* get a new view (should have fewer records) */
+ auto view = buffer->get_buffer_view();
+ auto reccnt = view.get_record_count();
+
+ /* verify the scan again */
+ for (size_t i=0; i<reccnt; i++) {
+ ck_assert_int_eq(view.get(i)->rec.key, i + 51);
+ }
+ }
+
+ delete buffer;
+}
+END_TEST
+
+
+START_TEST(t_bview_delete)
+{
+
+ auto buffer = new MutableBuffer<Rec>(50, 100);
+
+ /* insert 75 records and get tail when LWM is exceeded */
+ size_t new_head = 0;
+ Rec rec = {1, 1};
+ size_t cnt = 0;
+ for (size_t i=0; i<75; i++) {
+ ck_assert_int_eq(buffer->append(rec), 1);
+
+ rec.key++;
+ rec.value++;
+ cnt++;
+
+ if (buffer->is_at_low_watermark() && new_head == 0) {
+ new_head = buffer->get_tail();
+ }
+ }
+
+ buffer->advance_head(new_head);
+
+ for (size_t i=0; i<75; i++) {
+ ck_assert_int_eq(buffer->append(rec), 1);
+
+ rec.key++;
+ rec.value++;
+ cnt++;
+ }
+
+ Rec dr1 = {67, 67};
+ Rec dr2 = {89, 89};
+ Rec dr3 = {103, 103};
+
+ Rec fdr1 = {5, 5};
+ Rec fdr2 = {300, 300};
+ {
+ /* get a new view (should have fewer records) */
+ auto view = buffer->get_buffer_view();
+ ck_assert_int_eq(view.delete_record(dr1), 1);
+ ck_assert_int_eq(view.delete_record(dr2), 1);
+ ck_assert_int_eq(view.delete_record(dr3), 1);
+ ck_assert_int_eq(view.delete_record(fdr1), 0);
+ ck_assert_int_eq(view.delete_record(fdr2), 0);
+
+ for (size_t i=0; i<view.get_record_count(); i++) {
+ if (view.get(i)->rec == dr1 || view.get(i)->rec == dr2
+ || view.get(i)->rec == dr3) {
+ ck_assert_int_eq(view.get(i)->is_deleted(), 1);
+ } else {
+ ck_assert_int_eq(view.get(i)->is_deleted(), 0);
+ }
+ }
+ }
+
+ delete buffer;
+}
+END_TEST
+
Suite *unit_testing()
{
@@ -255,6 +392,11 @@ Suite *unit_testing()
suite_add_tcase(unit, append);
+ TCase *view = tcase_create("de::BufferView Testing");
+ tcase_add_test(view, t_bview_get);
+ tcase_add_test(view, t_bview_delete);
+
+ suite_add_tcase(unit, view);
TCase *truncate = tcase_create("de::MutableBuffer::truncate Testing");
tcase_add_test(truncate, t_truncate);