From 90bb0614fc1d8f1a185a778e31aaf9027c01aeb8 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 13 Nov 2023 11:44:09 -0500 Subject: Tombstone Compaction: re-enabled tombstone compaction Currently, proactive buffer tombstone compaction is disabled by forcing the buffer tombstone capacity to match its record capacity. It isn't clear how to best handle proactive buffer compactions in an environment where new buffers are spawned anyway. --- include/framework/DynamicExtension.h | 55 ++++++++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 9554c8c..9adc320 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -253,6 +253,32 @@ private: size_t m_buffer_capacity; size_t m_buffer_delete_capacity; + void enforce_delete_invariant(_Epoch *epoch) { + auto structure = epoch->get_structure(); + auto compactions = structure->get_compaction_tasks(); + + while (compactions.size() > 0) { + /* otherwise, we need to schedule a merge to compact tombstones */ + MergeArgs *args = new MergeArgs(); + args->epoch = epoch; + // FIXME: all full buffers can be merged at this point--but that requires + // retooling the shard interface a bit to do efficiently. + args->merges = compactions; + args->extension = this; + args->compaction = true; + + auto wait = args->result.get_future(); + + epoch->start_job(); + m_sched.schedule_job(merge, 0, args); + + /* wait for merge completion */ + wait.get(); + + compactions = structure->get_compaction_tasks(); + } + } + _Epoch *get_active_epoch() { return m_epochs[m_current_epoch.load()]; } @@ -272,6 +298,7 @@ private: } void advance_epoch() { + m_epoch_transition_lk.lock(); size_t new_epoch_num = m_newest_epoch.load(); @@ -281,6 +308,15 @@ private: _Epoch *new_epoch = m_epochs[new_epoch_num]; _Epoch *old_epoch = m_epochs[old_epoch_num]; + /* + * Verify the tombstone invariant within the epoch's structure, this + * may require scheduling additional merges. + * + * FIXME: having this inside the lock is going to TANK + * insertion performance. + */ + enforce_delete_invariant(new_epoch); + /* * Update the new Epoch to contain the buffers from the old one * that it doesn't currently have if using a multi-threaded @@ -445,12 +481,26 @@ private: vers->merge_levels(args->merges[i].second, args->merges[i].first); } - vers->merge_buffer(buff); + /* + * if the merge is a compaction, don't push the buffer down, + * as there is no guarantee that the merges will free up + * sufficient space in L0 + */ + if (!args->compaction) { + vers->merge_buffer(buff); + } args->epoch->end_job(); args->result.set_value(true); - ((DynamicExtension *) args->extension)->advance_epoch(); + /* + * Compactions occur on an epoch _before_ it becomes active, + * and as a result the active epoch should _not_ be advanced as + * part of a compaction merge + */ + if (!args->compaction) { + ((DynamicExtension *) args->extension)->advance_epoch(); + } delete args; } @@ -511,6 +561,7 @@ private: // retooling the shard interface a bit to do efficiently. args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count()); args->extension = this; + args->compaction = false; m_sched.schedule_job(merge, 0, args); } -- cgit v1.2.3