diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-09-20 14:03:23 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-09-20 14:03:23 -0400 |
| commit | 754372aeccb74815cbb16f32ceacb04b4c5aaba9 (patch) | |
| tree | 1e45694f56043a42bc342464b7096e3f3b8a58b6 | |
| parent | 6e30f576ca9d11d1901f4877315e97f84d15b1e1 (diff) | |
| download | dynamic-extension-754372aeccb74815cbb16f32ceacb04b4c5aaba9.tar.gz | |
Bugfixes for tiering
Fixed a few issues that manifested during the tiering tests,
1) When a version is copied, it now contains copies of the levels,
not just pointers (the levels themselves still hold pointers to
the shards, though).
2) Ensure that tasks are scheduled with the correct timestamp, they
were originally being scheduled backwards. The get_merge_tasks()
method already returns them in the correct order, so reversing
them again put it in the wrong order.
| -rw-r--r-- | include/framework/ExtensionStructure.h | 1 | ||||
| -rw-r--r-- | include/framework/InternalLevel.h | 8 | ||||
| -rw-r--r-- | include/framework/Scheduler.h | 4 | ||||
| -rw-r--r-- | tests/dynamic_extension_tests.inc | 24 |
4 files changed, 33 insertions, 4 deletions
diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h index 2fb9cf0..892e63b 100644 --- a/include/framework/ExtensionStructure.h +++ b/include/framework/ExtensionStructure.h @@ -312,6 +312,7 @@ public: m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get()); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get()); + m_levels[base_level]->finalize(); } m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index e67ae45..6cdac4e 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -106,6 +106,7 @@ public: for (size_t i=0; i<m_shards.size(); i++) { if (m_owns[i]) { delete m_shards[i]; + m_shards[i] = nullptr; m_owns[i] = false; } } @@ -113,6 +114,7 @@ public: m_shards[0] = m_pending_shard; m_owns[0] = true; m_pending_shard = nullptr; + m_shard_cnt = 1; } } @@ -241,13 +243,15 @@ private: std::vector<bool> m_owns; - InternalLevel *clone() { - auto new_level = new InternalLevel(m_level_no, m_shards.size()); + std::shared_ptr<InternalLevel> clone() { + auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size()); for (size_t i=0; i<m_shard_cnt; i++) { new_level->m_shards[i] = m_shards[i]; new_level->m_owns[i] = true; m_owns[i] = false; } + + return new_level; } }; diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h index 534ce25..6055bef 100644 --- a/include/framework/Scheduler.h +++ b/include/framework/Scheduler.h @@ -68,7 +68,7 @@ public: * Schedule the merge tasks (FIXME: currently this just * executes them sequentially in a blocking fashion) */ - for (ssize_t i=merges.size()-1; i>=0; i--) { + for (ssize_t i=0; i<merges.size(); i++) { merges[i].m_timestamp = m_timestamp.fetch_add(1); m_merge_queue_lock.lock(); m_merge_queue.push(merges[i]); @@ -120,6 +120,7 @@ private: void run_merge(MergeTask task, Structure *version) { version->merge_levels(task.m_target_level, task.m_source_level); + if (!version->validate_tombstone_proportion(task.m_target_level)) { auto tasks = version->get_merge_tasks(task.m_target_level); /* @@ -188,7 +189,6 @@ private: std::thread m_sched_thrd; bool m_shutdown; - }; } diff --git a/tests/dynamic_extension_tests.inc b/tests/dynamic_extension_tests.inc index b9866c3..bee28f7 100644 --- a/tests/dynamic_extension_tests.inc +++ b/tests/dynamic_extension_tests.inc @@ -45,6 +45,25 @@ START_TEST(t_insert) END_TEST +START_TEST(t_debug_insert) +{ + auto ext_wirs = new DE(100, 2, 1); + + uint64_t key = 0; + uint32_t val = 0; + for (size_t i=0; i<1000; i++) { + WRec r = {key, val, 1}; + ck_assert_int_eq(ext_wirs->insert(r), 1); + ck_assert_int_eq(ext_wirs->get_record_count(), i+1); + key++; + val++; + } + + delete ext_wirs; +} +END_TEST + + START_TEST(t_insert_with_mem_merges) { auto ext_wirs = new DE(100, 2, 1); @@ -329,8 +348,12 @@ START_TEST(t_static_structure) } size_t deletes = 0; + size_t t_reccnt = 0; + size_t k=0; for (auto rec : records) { + k++; ck_assert_int_eq(ext_wirs->insert(rec), 1); + t_reccnt++; if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) { std::vector<WRec> del_vec; @@ -378,6 +401,7 @@ Suite *unit_testing() TCase *insert = tcase_create("de::DynamicExtension<WIRS>::insert Testing"); tcase_add_test(insert, t_insert); tcase_add_test(insert, t_insert_with_mem_merges); + tcase_add_test(insert, t_debug_insert); suite_add_tcase(unit, insert); TCase *sampling = tcase_create("de::DynamicExtension<WIRS>::range_sample Testing"); |