From 90bb0614fc1d8f1a185a778e31aaf9027c01aeb8 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 13 Nov 2023 11:44:09 -0500 Subject: Tombstone Compaction: re-enabled tombstone compaction Currently, proactive buffer tombstone compaction is disabled by forcing the buffer tombstone capacity to match its record capacity. It isn't clear how to best handle proactive buffer compactions in an environment where new buffers are spawned anyway. --- include/framework/DynamicExtension.h | 55 +++++++++++++++++++++++- include/framework/scheduling/Task.h | 1 + include/framework/structure/ExtensionStructure.h | 51 ++++++++++++++++++++++ include/framework/structure/MutableBuffer.h | 2 +- tests/dynamic_extension_tests.inc | 2 - 5 files changed, 106 insertions(+), 5 deletions(-) diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 9554c8c..9adc320 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -253,6 +253,32 @@ private: size_t m_buffer_capacity; size_t m_buffer_delete_capacity; + void enforce_delete_invariant(_Epoch *epoch) { + auto structure = epoch->get_structure(); + auto compactions = structure->get_compaction_tasks(); + + while (compactions.size() > 0) { + /* otherwise, we need to schedule a merge to compact tombstones */ + MergeArgs *args = new MergeArgs(); + args->epoch = epoch; + // FIXME: all full buffers can be merged at this point--but that requires + // retooling the shard interface a bit to do efficiently. + args->merges = compactions; + args->extension = this; + args->compaction = true; + + auto wait = args->result.get_future(); + + epoch->start_job(); + m_sched.schedule_job(merge, 0, args); + + /* wait for merge completion */ + wait.get(); + + compactions = structure->get_compaction_tasks(); + } + } + _Epoch *get_active_epoch() { return m_epochs[m_current_epoch.load()]; } @@ -272,6 +298,7 @@ private: } void advance_epoch() { + m_epoch_transition_lk.lock(); size_t new_epoch_num = m_newest_epoch.load(); @@ -281,6 +308,15 @@ private: _Epoch *new_epoch = m_epochs[new_epoch_num]; _Epoch *old_epoch = m_epochs[old_epoch_num]; + /* + * Verify the tombstone invariant within the epoch's structure, this + * may require scheduling additional merges. + * + * FIXME: having this inside the lock is going to TANK + * insertion performance. + */ + enforce_delete_invariant(new_epoch); + /* * Update the new Epoch to contain the buffers from the old one * that it doesn't currently have if using a multi-threaded @@ -445,12 +481,26 @@ private: vers->merge_levels(args->merges[i].second, args->merges[i].first); } - vers->merge_buffer(buff); + /* + * if the merge is a compaction, don't push the buffer down, + * as there is no guarantee that the merges will free up + * sufficient space in L0 + */ + if (!args->compaction) { + vers->merge_buffer(buff); + } args->epoch->end_job(); args->result.set_value(true); - ((DynamicExtension *) args->extension)->advance_epoch(); + /* + * Compactions occur on an epoch _before_ it becomes active, + * and as a result the active epoch should _not_ be advanced as + * part of a compaction merge + */ + if (!args->compaction) { + ((DynamicExtension *) args->extension)->advance_epoch(); + } delete args; } @@ -511,6 +561,7 @@ private: // retooling the shard interface a bit to do efficiently. args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count()); args->extension = this; + args->compaction = false; m_sched.schedule_job(merge, 0, args); } diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index d211fb5..c10ed8b 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -21,6 +21,7 @@ struct MergeArgs { Epoch *epoch; std::vector merges; std::promise result; + bool compaction; void *extension; }; diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 74cede6..a174805 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -201,6 +201,57 @@ public: return m_levels; } + std::vector get_compaction_tasks() { + std::vector tasks; + + /* if the tombstone/delete invariant is satisfied, no need for compactions */ + if (validate_tombstone_proportion()) { + return tasks; + } + + /* locate the first level to violate the invariant */ + level_index violation_idx = -1; + for (level_index i=0; i0; i--) { + MergeTask task = {i-1, i}; + + /* + * The amount of storage required for the merge accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i-1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_merge_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } + //task.m_size = 2* reccnt * sizeof(R); + + tasks.push_back(task); + } + + return tasks; + } + /* * */ diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 671824f..8b17091 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -32,7 +32,7 @@ template class MutableBuffer { public: MutableBuffer(size_t capacity, size_t max_tombstone_cap) - : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) + : m_cap(capacity), m_tombstone_cap(capacity), m_reccnt(0) , m_tombstonecnt(0), m_weight(0), m_max_weight(0), m_tail(0) { m_data = (Wrapped*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped)); m_merge_data = (Wrapped*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped)); diff --git a/tests/dynamic_extension_tests.inc b/tests/dynamic_extension_tests.inc index dac7d51..be82132 100644 --- a/tests/dynamic_extension_tests.inc +++ b/tests/dynamic_extension_tests.inc @@ -245,8 +245,6 @@ START_TEST(t_tombstone_merging_01) if (gsl_rng_uniform(rng) < 0.25 && deleted.find(rec) == deleted.end()) { to_delete.insert(rec); } - - ck_assert(test_de->validate_tombstone_proportion()); } test_de->await_next_epoch(); -- cgit v1.2.3