summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt5
-rw-r--r--include/framework/DynamicExtension.h21
-rw-r--r--include/framework/structure/BufferView.h10
-rw-r--r--tests/de_level_concurrent.cpp58
-rw-r--r--tests/de_level_tomb.cpp2
-rw-r--r--tests/include/concurrent_extension.h379
6 files changed, 467 insertions, 8 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 17a0d84..6f7cf90 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -75,6 +75,11 @@ if (tests)
target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread atomic)
target_link_options(de_level_tomb PUBLIC -mcx16)
target_include_directories(de_level_tomb PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external)
+
+ add_executable(de_level_concurrent ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_concurrent.cpp)
+ target_link_libraries(de_level_concurrent PUBLIC gsl check subunit pthread atomic)
+ target_link_options(de_level_concurrent PUBLIC -mcx16)
+ target_include_directories(de_level_concurrent PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external)
add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp)
target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread atomic)
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index bddc950..cb21ae3 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -28,6 +28,7 @@
#include "framework/scheduling/Epoch.h"
+
namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING,
@@ -176,12 +177,14 @@ public:
*/
{
auto bv = epoch->get_buffer();
- shards.emplace_back(new S(std::move(bv)));
+ if (bv.get_record_count() > 0) {
+ shards.emplace_back(new S(std::move(bv)));
+ }
}
if (vers->get_levels().size() > 0) {
for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
- if (vers->get_levels()[i]) {
+ if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) {
shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
}
}
@@ -426,15 +429,21 @@ private:
Structure *vers = args->epoch->get_structure();
- // could be flushed at once here.
- auto buffer_view = args->epoch->get_flush_buffer();
- size_t new_head = buffer_view.get_tail();
for (ssize_t i=0; i<args->merges.size(); i++) {
vers->reconstruction(args->merges[i].second, args->merges[i].first);
}
/*
+ * we'll grab the buffer AFTER doing the internal reconstruction, so we can
+ * flush as many records as possible in one go. The reconstruction was done so
+ * as to make room for the full buffer anyway, so there's no real benefit to doing
+ * this first.
+ */
+ auto buffer_view = args->epoch->get_buffer();
+ size_t new_head = buffer_view.get_tail();
+
+ /*
* if performing a compaction, don't flush the buffer, as
* there is no guarantee that any necessary reconstructions
* will free sufficient space in L0 to support a flush
@@ -528,7 +537,7 @@ private:
ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>();
args->epoch = epoch;
- args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_low_watermark());
+ args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_high_watermark());
args->extension = this;
args->compaction = false;
/* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index c751786..099b7a2 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -105,7 +105,15 @@ public:
}
void copy_to_buffer(psudb::byte *buffer) {
- memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped<R>));
+ /* check if the region to be copied circles back to start. If so, do it in two steps */
+ if ((m_head % m_cap) + get_record_count() > m_cap) {
+ size_t split_idx = m_cap - (m_head % m_cap);
+
+ memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), split_idx* sizeof(Wrapped<R>));
+ memcpy(buffer + split_idx, (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped<R>));
+ } else {
+ memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped<R>));
+ }
}
size_t get_tail() {
diff --git a/tests/de_level_concurrent.cpp b/tests/de_level_concurrent.cpp
new file mode 100644
index 0000000..b52fdd9
--- /dev/null
+++ b/tests/de_level_concurrent.cpp
@@ -0,0 +1,58 @@
+/*
+ * tests/de_level_tomb.cpp
+ *
+ * Unit tests for Dynamic Extension Framework
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#include <set>
+#include <random>
+#include <algorithm>
+
+#include "include/testing.h"
+#include "framework/DynamicExtension.h"
+#include "shard/ISAMTree.h"
+#include "query/rangequery.h"
+#include "shard/TrieSpline.h"
+
+#include <check.h>
+using namespace de;
+
+typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<ISAMTree<Rec>, Rec>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
+
+#include "include/concurrent_extension.h"
+
+
+Suite *unit_testing()
+{
+ Suite *unit = suite_create("DynamicExtension: Tombstone Leveling Testing");
+ inject_dynamic_extension_tests(unit);
+
+ return unit;
+}
+
+
+int shard_unit_tests()
+{
+ int failed = 0;
+ Suite *unit = unit_testing();
+ SRunner *unit_shardner = srunner_create(unit);
+
+ srunner_run_all(unit_shardner, CK_NORMAL);
+ failed = srunner_ntests_failed(unit_shardner);
+ srunner_free(unit_shardner);
+
+ return failed;
+}
+
+
+int main()
+{
+ int unit_failed = shard_unit_tests();
+
+ return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}
diff --git a/tests/de_level_tomb.cpp b/tests/de_level_tomb.cpp
index 91ee26d..44a0759 100644
--- a/tests/de_level_tomb.cpp
+++ b/tests/de_level_tomb.cpp
@@ -22,7 +22,7 @@
#include <check.h>
using namespace de;
-typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<ISAMTree<Rec>, Rec>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
+typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<ISAMTree<Rec>, Rec>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
#include "include/dynamic_extension.h"
diff --git a/tests/include/concurrent_extension.h b/tests/include/concurrent_extension.h
new file mode 100644
index 0000000..86f8e12
--- /dev/null
+++ b/tests/include/concurrent_extension.h
@@ -0,0 +1,379 @@
+/*
+ * tests/include/dynamic_extension.h
+ *
+ * Standardized unit tests for DynamicExtension objects
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * WARNING: This file must be included in the main unit test set
+ * after the definition of an appropriate Shard, Query, and Rec
+ * type. In particular, Rec needs to implement the key-value
+ * pair interface. For other types of record, you'll need to
+ * use a different set of unit tests.
+ */
+#pragma once
+
+/*
+ * Uncomment these lines temporarily to remove errors in this file
+ * temporarily for development purposes. They should be removed prior
+ * to building, to ensure no duplicate definitions. These includes/defines
+ * should be included in the source file that includes this one, above the
+ * include statement.
+ */
+#include "testing.h"
+#include "framework/DynamicExtension.h"
+#include "framework/scheduling/FIFOScheduler.h"
+#include "shard/ISAMTree.h"
+#include "query/rangequery.h"
+#include <check.h>
+using namespace de;
+typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<ISAMTree<Rec>, Rec>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
+
+
+START_TEST(t_create)
+{
+ auto test_de = new DE(100, 1000, 2);
+
+ ck_assert_ptr_nonnull(test_de);
+ ck_assert_int_eq(test_de->get_record_count(), 0);
+ ck_assert_int_eq(test_de->get_height(), 0);
+
+ delete test_de;
+}
+END_TEST
+
+
+START_TEST(t_insert)
+{
+ auto test_de = new DE(100, 1000, 2);
+
+ uint64_t key = 0;
+ uint32_t val = 0;
+ for (size_t i=0; i<100; i++) {
+ Rec r = {key, val};
+ ck_assert_int_eq(test_de->insert(r), 1);
+ key++;
+ val++;
+ }
+
+ ck_assert_int_eq(test_de->get_height(), 0);
+ ck_assert_int_eq(test_de->get_record_count(), 100);
+
+ delete test_de;
+}
+END_TEST
+
+
+START_TEST(t_debug_insert)
+{
+ auto test_de = new DE(100, 1000, 2);
+
+ uint64_t key = 0;
+ uint32_t val = 0;
+ for (size_t i=0; i<1000; i++) {
+ Rec r = {key, val};
+ ck_assert_int_eq(test_de->insert(r), 1);
+ //ck_assert_int_eq(test_de->get_record_count(), i+1);
+ key++;
+ val++;
+ }
+
+ delete test_de;
+}
+END_TEST
+
+
+START_TEST(t_insert_with_mem_merges)
+{
+ auto test_de = new DE(100, 1000, 2);
+
+ uint64_t key = 0;
+ uint32_t val = 0;
+
+ Rec r = {key, val};
+ for (size_t i=0; i<1000; i++) {
+ ck_assert_int_eq(test_de->insert(r), 1);
+ r.key++;
+ r.value++;
+ }
+
+ ck_assert_int_eq(test_de->get_record_count(), 1000);
+
+ test_de->await_next_epoch();
+
+ ck_assert_int_eq(test_de->get_record_count(), 1000);
+
+ /*
+ * verify that we can fill past the high water mark, potentially
+ * stalling to allow merges to finish as needed.
+ */
+ size_t cnt = 0;
+ do {
+ if (test_de->insert(r)) {
+ r.key++;
+ r.value++;
+ cnt++;
+ } else {
+ sleep(1);
+ }
+ } while (cnt < 10000);
+
+ test_de->await_next_epoch();
+
+ ck_assert_int_eq(test_de->get_record_count(), 11000);
+
+ delete test_de;
+}
+END_TEST
+
+
+START_TEST(t_range_query)
+{
+ auto test_de = new DE(100, 1000, 2);
+ size_t n = 10000;
+
+ std::vector<uint64_t> keys;
+ for (size_t i=0; i<n; i++) {
+ keys.push_back(rand() % 25000);
+ }
+
+ std::random_device rd;
+ std::mt19937 gen{rd()};
+ std::shuffle(keys.begin(), keys.end(), gen);
+
+ size_t i=0;
+ while ( i < keys.size()) {
+ Rec r = {keys[i], (uint32_t) i};
+ if (test_de->insert(r)) {
+ i++;
+ } else {
+ sleep(1);
+ }
+ }
+
+ test_de->await_next_epoch();
+
+ std::sort(keys.begin(), keys.end());
+
+ auto idx = rand() % (keys.size() - 250);
+
+ uint64_t lower_key = keys[idx];
+ uint64_t upper_key = keys[idx + 250];
+
+ rq::Parms<Rec> p;
+ p.lower_bound = lower_key;
+ p.upper_bound = upper_key;
+
+ auto result = test_de->query(&p);
+ auto r = result.get();
+ std::sort(r.begin(), r.end());
+ ck_assert_int_eq(r.size(), 251);
+
+ for (size_t i=0; i<r.size(); i++) {
+ ck_assert_int_eq(r[i].key, keys[idx + i]);
+ }
+
+ delete test_de;
+}
+END_TEST
+
+
+START_TEST(t_tombstone_merging_01)
+{
+ size_t reccnt = 100000;
+ auto test_de = new DE(100, 1000, 2);
+
+ auto rng = gsl_rng_alloc(gsl_rng_mt19937);
+
+ std::set<std::pair<uint64_t, uint32_t>> records;
+ std::set<std::pair<uint64_t, uint32_t>> to_delete;
+ std::set<std::pair<uint64_t, uint32_t>> deleted;
+
+ while (records.size() < reccnt) {
+ uint64_t key = rand();
+ uint32_t val = rand();
+
+ if (records.find({key, val}) != records.end()) continue;
+
+ records.insert({key, val});
+ }
+
+ size_t deletes = 0;
+ size_t cnt=0;
+ for (auto rec : records) {
+ Rec r = {rec.first, rec.second};
+ while (!test_de->insert(r)) {
+ sleep(1);
+ }
+
+ if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) {
+ std::vector<std::pair<uint64_t, uint32_t>> del_vec;
+ std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
+
+ for (size_t i=0; i<del_vec.size(); i++) {
+ Rec dr = {del_vec[i].first, del_vec[i].second};
+ while (!test_de->erase(dr)) {
+ sleep(1);
+ }
+ deletes++;
+ to_delete.erase(del_vec[i]);
+ deleted.insert(del_vec[i]);
+ }
+ }
+
+ if (gsl_rng_uniform(rng) < 0.25 && deleted.find(rec) == deleted.end()) {
+ to_delete.insert(rec);
+ }
+ }
+
+ test_de->await_next_epoch();
+
+ ck_assert(test_de->validate_tombstone_proportion());
+
+ gsl_rng_free(rng);
+ delete test_de;
+}
+END_TEST
+
+DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
+ auto rng = gsl_rng_alloc(gsl_rng_mt19937);
+
+ auto test_de = new DE(1000, 10000, 2);
+
+ std::set<Rec> records;
+ std::set<Rec> to_delete;
+ std::set<Rec> deleted;
+
+ while (records.size() < reccnt) {
+ uint64_t key = rand();
+ uint32_t val = rand();
+
+ if (records.find({key, val}) != records.end()) continue;
+
+ records.insert({key, val});
+ }
+
+ size_t deletes = 0;
+ for (auto rec : records) {
+ ck_assert_int_eq(test_de->insert(rec), 1);
+
+ if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) {
+ std::vector<Rec> del_vec;
+ std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
+
+ for (size_t i=0; i<del_vec.size(); i++) {
+ test_de->erase(del_vec[i]);
+ deletes++;
+ to_delete.erase(del_vec[i]);
+ deleted.insert(del_vec[i]);
+ }
+ }
+
+ if (gsl_rng_uniform(rng) < 0.25 && deleted.find(rec) == deleted.end()) {
+ to_delete.insert(rec);
+ }
+ }
+
+ gsl_rng_free(rng);
+
+ return test_de;
+}
+
+START_TEST(t_static_structure)
+{
+ auto rng = gsl_rng_alloc(gsl_rng_mt19937);
+
+ size_t reccnt = 100000;
+ auto test_de = new DE(100, 1000, 2);
+
+ std::set<Rec> records;
+ std::set<Rec> to_delete;
+ std::set<Rec> deleted;
+
+ while (records.size() < reccnt) {
+ uint64_t key = rand();
+ uint32_t val = rand();
+
+ if (records.find({key, val}) != records.end()) continue;
+
+ records.insert({key, val});
+ }
+
+ size_t deletes = 0;
+ size_t t_reccnt = 0;
+ size_t k=0;
+ for (auto rec : records) {
+ k++;
+ while (!test_de->insert(rec)) {
+ sleep(1);
+ }
+ t_reccnt++;
+
+ if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) {
+ std::vector<Rec> del_vec;
+ std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
+
+ for (size_t i=0; i<del_vec.size(); i++) {
+ while (!test_de->erase(del_vec[1])) {
+ sleep(1);
+ }
+
+ deletes++;
+ to_delete.erase(del_vec[i]);
+ deleted.insert(del_vec[i]);
+ }
+ }
+
+ if (gsl_rng_uniform(rng) < 0.25 && deleted.find(rec) == deleted.end()) {
+ to_delete.insert(rec);
+ }
+ }
+
+ auto flat = test_de->create_static_structure();
+ ck_assert_int_eq(flat->get_record_count(), reccnt - deletes);
+
+ uint64_t prev_key = 0;
+ for (size_t i=0; i<flat->get_record_count(); i++) {
+ auto k = flat->get_record_at(i)->rec.key;
+ ck_assert_int_ge(k, prev_key);
+ prev_key = k;
+ }
+
+ gsl_rng_free(rng);
+ delete flat;
+ delete test_de;
+}
+END_TEST
+
+
+static void inject_dynamic_extension_tests(Suite *suite) {
+ TCase *create = tcase_create("de::DynamicExtension::constructor Testing");
+ tcase_add_test(create, t_create);
+ suite_add_tcase(suite, create);
+
+ TCase *insert = tcase_create("de::DynamicExtension::insert Testing");
+ tcase_add_test(insert, t_insert);
+ tcase_add_test(insert, t_insert_with_mem_merges);
+ tcase_add_test(insert, t_debug_insert);
+ tcase_set_timeout(insert, 500);
+ suite_add_tcase(suite, insert);
+
+ /*
+ TCase *query = tcase_create("de::DynamicExtension::range_query Testing");
+ tcase_add_test(query, t_range_query);
+ suite_add_tcase(suite, query);
+
+
+ TCase *ts = tcase_create("de::DynamicExtension::tombstone_compaction Testing");
+ tcase_add_test(ts, t_tombstone_merging_01);
+ tcase_set_timeout(ts, 500);
+ suite_add_tcase(suite, ts);
+
+ TCase *flat = tcase_create("de::DynamicExtension::create_static_structure Testing");
+ tcase_add_test(flat, t_static_structure);
+ tcase_set_timeout(flat, 500);
+ suite_add_tcase(suite, flat);
+ */
+}