summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-09-20 14:03:23 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-09-20 14:03:23 -0400
commit754372aeccb74815cbb16f32ceacb04b4c5aaba9 (patch)
tree1e45694f56043a42bc342464b7096e3f3b8a58b6
parent6e30f576ca9d11d1901f4877315e97f84d15b1e1 (diff)
downloaddynamic-extension-754372aeccb74815cbb16f32ceacb04b4c5aaba9.tar.gz
Bugfixes for tiering
Fixed a few issues that manifested during the tiering tests, 1) When a version is copied, it now contains copies of the levels, not just pointers (the levels themselves still hold pointers to the shards, though). 2) Ensure that tasks are scheduled with the correct timestamp, they were originally being scheduled backwards. The get_merge_tasks() method already returns them in the correct order, so reversing them again put it in the wrong order.
-rw-r--r--include/framework/ExtensionStructure.h1
-rw-r--r--include/framework/InternalLevel.h8
-rw-r--r--include/framework/Scheduler.h4
-rw-r--r--tests/dynamic_extension_tests.inc24
4 files changed, 33 insertions, 4 deletions
diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h
index 2fb9cf0..892e63b 100644
--- a/include/framework/ExtensionStructure.h
+++ b/include/framework/ExtensionStructure.h
@@ -312,6 +312,7 @@ public:
m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get());
} else {
m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get());
+ m_levels[base_level]->finalize();
}
m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index e67ae45..6cdac4e 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -106,6 +106,7 @@ public:
for (size_t i=0; i<m_shards.size(); i++) {
if (m_owns[i]) {
delete m_shards[i];
+ m_shards[i] = nullptr;
m_owns[i] = false;
}
}
@@ -113,6 +114,7 @@ public:
m_shards[0] = m_pending_shard;
m_owns[0] = true;
m_pending_shard = nullptr;
+ m_shard_cnt = 1;
}
}
@@ -241,13 +243,15 @@ private:
std::vector<bool> m_owns;
- InternalLevel *clone() {
- auto new_level = new InternalLevel(m_level_no, m_shards.size());
+ std::shared_ptr<InternalLevel> clone() {
+ auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size());
for (size_t i=0; i<m_shard_cnt; i++) {
new_level->m_shards[i] = m_shards[i];
new_level->m_owns[i] = true;
m_owns[i] = false;
}
+
+ return new_level;
}
};
diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h
index 534ce25..6055bef 100644
--- a/include/framework/Scheduler.h
+++ b/include/framework/Scheduler.h
@@ -68,7 +68,7 @@ public:
* Schedule the merge tasks (FIXME: currently this just
* executes them sequentially in a blocking fashion)
*/
- for (ssize_t i=merges.size()-1; i>=0; i--) {
+ for (ssize_t i=0; i<merges.size(); i++) {
merges[i].m_timestamp = m_timestamp.fetch_add(1);
m_merge_queue_lock.lock();
m_merge_queue.push(merges[i]);
@@ -120,6 +120,7 @@ private:
void run_merge(MergeTask task, Structure *version) {
version->merge_levels(task.m_target_level, task.m_source_level);
+
if (!version->validate_tombstone_proportion(task.m_target_level)) {
auto tasks = version->get_merge_tasks(task.m_target_level);
/*
@@ -188,7 +189,6 @@ private:
std::thread m_sched_thrd;
bool m_shutdown;
-
};
}
diff --git a/tests/dynamic_extension_tests.inc b/tests/dynamic_extension_tests.inc
index b9866c3..bee28f7 100644
--- a/tests/dynamic_extension_tests.inc
+++ b/tests/dynamic_extension_tests.inc
@@ -45,6 +45,25 @@ START_TEST(t_insert)
END_TEST
+START_TEST(t_debug_insert)
+{
+ auto ext_wirs = new DE(100, 2, 1);
+
+ uint64_t key = 0;
+ uint32_t val = 0;
+ for (size_t i=0; i<1000; i++) {
+ WRec r = {key, val, 1};
+ ck_assert_int_eq(ext_wirs->insert(r), 1);
+ ck_assert_int_eq(ext_wirs->get_record_count(), i+1);
+ key++;
+ val++;
+ }
+
+ delete ext_wirs;
+}
+END_TEST
+
+
START_TEST(t_insert_with_mem_merges)
{
auto ext_wirs = new DE(100, 2, 1);
@@ -329,8 +348,12 @@ START_TEST(t_static_structure)
}
size_t deletes = 0;
+ size_t t_reccnt = 0;
+ size_t k=0;
for (auto rec : records) {
+ k++;
ck_assert_int_eq(ext_wirs->insert(rec), 1);
+ t_reccnt++;
if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) {
std::vector<WRec> del_vec;
@@ -378,6 +401,7 @@ Suite *unit_testing()
TCase *insert = tcase_create("de::DynamicExtension<WIRS>::insert Testing");
tcase_add_test(insert, t_insert);
tcase_add_test(insert, t_insert_with_mem_merges);
+ tcase_add_test(insert, t_debug_insert);
suite_add_tcase(unit, insert);
TCase *sampling = tcase_create("de::DynamicExtension<WIRS>::range_sample Testing");