summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/framework/ExtensionStructure.h1
-rw-r--r--include/framework/InternalLevel.h8
-rw-r--r--include/framework/Scheduler.h4
-rw-r--r--tests/dynamic_extension_tests.inc24
4 files changed, 33 insertions, 4 deletions
diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h
index 2fb9cf0..892e63b 100644
--- a/include/framework/ExtensionStructure.h
+++ b/include/framework/ExtensionStructure.h
@@ -312,6 +312,7 @@ public:
m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get());
} else {
m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get());
+ m_levels[base_level]->finalize();
}
m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index e67ae45..6cdac4e 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -106,6 +106,7 @@ public:
for (size_t i=0; i<m_shards.size(); i++) {
if (m_owns[i]) {
delete m_shards[i];
+ m_shards[i] = nullptr;
m_owns[i] = false;
}
}
@@ -113,6 +114,7 @@ public:
m_shards[0] = m_pending_shard;
m_owns[0] = true;
m_pending_shard = nullptr;
+ m_shard_cnt = 1;
}
}
@@ -241,13 +243,15 @@ private:
std::vector<bool> m_owns;
- InternalLevel *clone() {
- auto new_level = new InternalLevel(m_level_no, m_shards.size());
+ std::shared_ptr<InternalLevel> clone() {
+ auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size());
for (size_t i=0; i<m_shard_cnt; i++) {
new_level->m_shards[i] = m_shards[i];
new_level->m_owns[i] = true;
m_owns[i] = false;
}
+
+ return new_level;
}
};
diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h
index 534ce25..6055bef 100644
--- a/include/framework/Scheduler.h
+++ b/include/framework/Scheduler.h
@@ -68,7 +68,7 @@ public:
* Schedule the merge tasks (FIXME: currently this just
* executes them sequentially in a blocking fashion)
*/
- for (ssize_t i=merges.size()-1; i>=0; i--) {
+ for (ssize_t i=0; i<merges.size(); i++) {
merges[i].m_timestamp = m_timestamp.fetch_add(1);
m_merge_queue_lock.lock();
m_merge_queue.push(merges[i]);
@@ -120,6 +120,7 @@ private:
void run_merge(MergeTask task, Structure *version) {
version->merge_levels(task.m_target_level, task.m_source_level);
+
if (!version->validate_tombstone_proportion(task.m_target_level)) {
auto tasks = version->get_merge_tasks(task.m_target_level);
/*
@@ -188,7 +189,6 @@ private:
std::thread m_sched_thrd;
bool m_shutdown;
-
};
}
diff --git a/tests/dynamic_extension_tests.inc b/tests/dynamic_extension_tests.inc
index b9866c3..bee28f7 100644
--- a/tests/dynamic_extension_tests.inc
+++ b/tests/dynamic_extension_tests.inc
@@ -45,6 +45,25 @@ START_TEST(t_insert)
END_TEST
+START_TEST(t_debug_insert)
+{
+ auto ext_wirs = new DE(100, 2, 1);
+
+ uint64_t key = 0;
+ uint32_t val = 0;
+ for (size_t i=0; i<1000; i++) {
+ WRec r = {key, val, 1};
+ ck_assert_int_eq(ext_wirs->insert(r), 1);
+ ck_assert_int_eq(ext_wirs->get_record_count(), i+1);
+ key++;
+ val++;
+ }
+
+ delete ext_wirs;
+}
+END_TEST
+
+
START_TEST(t_insert_with_mem_merges)
{
auto ext_wirs = new DE(100, 2, 1);
@@ -329,8 +348,12 @@ START_TEST(t_static_structure)
}
size_t deletes = 0;
+ size_t t_reccnt = 0;
+ size_t k=0;
for (auto rec : records) {
+ k++;
ck_assert_int_eq(ext_wirs->insert(rec), 1);
+ t_reccnt++;
if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) {
std::vector<WRec> del_vec;
@@ -378,6 +401,7 @@ Suite *unit_testing()
TCase *insert = tcase_create("de::DynamicExtension<WIRS>::insert Testing");
tcase_add_test(insert, t_insert);
tcase_add_test(insert, t_insert_with_mem_merges);
+ tcase_add_test(insert, t_debug_insert);
suite_add_tcase(unit, insert);
TCase *sampling = tcase_create("de::DynamicExtension<WIRS>::range_sample Testing");