diff options
| -rw-r--r-- | include/framework/ExtensionStructure.h | 1 | ||||
| -rw-r--r-- | include/framework/InternalLevel.h | 8 | ||||
| -rw-r--r-- | include/framework/Scheduler.h | 4 | ||||
| -rw-r--r-- | tests/dynamic_extension_tests.inc | 24 |
4 files changed, 33 insertions, 4 deletions
diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h index 2fb9cf0..892e63b 100644 --- a/include/framework/ExtensionStructure.h +++ b/include/framework/ExtensionStructure.h @@ -312,6 +312,7 @@ public: m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get()); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get()); + m_levels[base_level]->finalize(); } m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index e67ae45..6cdac4e 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -106,6 +106,7 @@ public: for (size_t i=0; i<m_shards.size(); i++) { if (m_owns[i]) { delete m_shards[i]; + m_shards[i] = nullptr; m_owns[i] = false; } } @@ -113,6 +114,7 @@ public: m_shards[0] = m_pending_shard; m_owns[0] = true; m_pending_shard = nullptr; + m_shard_cnt = 1; } } @@ -241,13 +243,15 @@ private: std::vector<bool> m_owns; - InternalLevel *clone() { - auto new_level = new InternalLevel(m_level_no, m_shards.size()); + std::shared_ptr<InternalLevel> clone() { + auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size()); for (size_t i=0; i<m_shard_cnt; i++) { new_level->m_shards[i] = m_shards[i]; new_level->m_owns[i] = true; m_owns[i] = false; } + + return new_level; } }; diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h index 534ce25..6055bef 100644 --- a/include/framework/Scheduler.h +++ b/include/framework/Scheduler.h @@ -68,7 +68,7 @@ public: * Schedule the merge tasks (FIXME: currently this just * executes them sequentially in a blocking fashion) */ - for (ssize_t i=merges.size()-1; i>=0; i--) { + for (ssize_t i=0; i<merges.size(); i++) { merges[i].m_timestamp = m_timestamp.fetch_add(1); m_merge_queue_lock.lock(); m_merge_queue.push(merges[i]); @@ -120,6 +120,7 @@ private: void run_merge(MergeTask task, Structure *version) { version->merge_levels(task.m_target_level, task.m_source_level); + if (!version->validate_tombstone_proportion(task.m_target_level)) { auto tasks = version->get_merge_tasks(task.m_target_level); /* @@ -188,7 +189,6 @@ private: std::thread m_sched_thrd; bool m_shutdown; - }; } diff --git a/tests/dynamic_extension_tests.inc b/tests/dynamic_extension_tests.inc index b9866c3..bee28f7 100644 --- a/tests/dynamic_extension_tests.inc +++ b/tests/dynamic_extension_tests.inc @@ -45,6 +45,25 @@ START_TEST(t_insert) END_TEST +START_TEST(t_debug_insert) +{ + auto ext_wirs = new DE(100, 2, 1); + + uint64_t key = 0; + uint32_t val = 0; + for (size_t i=0; i<1000; i++) { + WRec r = {key, val, 1}; + ck_assert_int_eq(ext_wirs->insert(r), 1); + ck_assert_int_eq(ext_wirs->get_record_count(), i+1); + key++; + val++; + } + + delete ext_wirs; +} +END_TEST + + START_TEST(t_insert_with_mem_merges) { auto ext_wirs = new DE(100, 2, 1); @@ -329,8 +348,12 @@ START_TEST(t_static_structure) } size_t deletes = 0; + size_t t_reccnt = 0; + size_t k=0; for (auto rec : records) { + k++; ck_assert_int_eq(ext_wirs->insert(rec), 1); + t_reccnt++; if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) { std::vector<WRec> del_vec; @@ -378,6 +401,7 @@ Suite *unit_testing() TCase *insert = tcase_create("de::DynamicExtension<WIRS>::insert Testing"); tcase_add_test(insert, t_insert); tcase_add_test(insert, t_insert_with_mem_merges); + tcase_add_test(insert, t_debug_insert); suite_add_tcase(unit, insert); TCase *sampling = tcase_create("de::DynamicExtension<WIRS>::range_sample Testing"); |