diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-11-13 11:44:09 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-11-13 11:44:09 -0500 |
| commit | 90bb0614fc1d8f1a185a778e31aaf9027c01aeb8 (patch) | |
| tree | 22a865978e398cef2fdcef763ab893c2da769a4a /include/framework/DynamicExtension.h | |
| parent | 83486744600e8be338c75c2e3d2339452a392a9d (diff) | |
| download | dynamic-extension-90bb0614fc1d8f1a185a778e31aaf9027c01aeb8.tar.gz | |
Tombstone Compaction: re-enabled tombstone compaction
Currently, proactive buffer tombstone compaction is disabled by forcing
the buffer tombstone capacity to match its record capacity. It isn't
clear how to best handle proactive buffer compactions in an environment
where new buffers are spawned anyway.
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 55 |
1 files changed, 53 insertions, 2 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 9554c8c..9adc320 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -253,6 +253,32 @@ private: size_t m_buffer_capacity; size_t m_buffer_delete_capacity; + void enforce_delete_invariant(_Epoch *epoch) { + auto structure = epoch->get_structure(); + auto compactions = structure->get_compaction_tasks(); + + while (compactions.size() > 0) { + /* otherwise, we need to schedule a merge to compact tombstones */ + MergeArgs<R, S, Q, L> *args = new MergeArgs<R, S, Q, L>(); + args->epoch = epoch; + // FIXME: all full buffers can be merged at this point--but that requires + // retooling the shard interface a bit to do efficiently. + args->merges = compactions; + args->extension = this; + args->compaction = true; + + auto wait = args->result.get_future(); + + epoch->start_job(); + m_sched.schedule_job(merge, 0, args); + + /* wait for merge completion */ + wait.get(); + + compactions = structure->get_compaction_tasks(); + } + } + _Epoch *get_active_epoch() { return m_epochs[m_current_epoch.load()]; } @@ -272,6 +298,7 @@ private: } void advance_epoch() { + m_epoch_transition_lk.lock(); size_t new_epoch_num = m_newest_epoch.load(); @@ -282,6 +309,15 @@ private: _Epoch *old_epoch = m_epochs[old_epoch_num]; /* + * Verify the tombstone invariant within the epoch's structure, this + * may require scheduling additional merges. + * + * FIXME: having this inside the lock is going to TANK + * insertion performance. + */ + enforce_delete_invariant(new_epoch); + + /* * Update the new Epoch to contain the buffers from the old one * that it doesn't currently have if using a multi-threaded * scheduler (otherwise, there is only one buffer that is @@ -445,12 +481,26 @@ private: vers->merge_levels(args->merges[i].second, args->merges[i].first); } - vers->merge_buffer(buff); + /* + * if the merge is a compaction, don't push the buffer down, + * as there is no guarantee that the merges will free up + * sufficient space in L0 + */ + if (!args->compaction) { + vers->merge_buffer(buff); + } args->epoch->end_job(); args->result.set_value(true); - ((DynamicExtension *) args->extension)->advance_epoch(); + /* + * Compactions occur on an epoch _before_ it becomes active, + * and as a result the active epoch should _not_ be advanced as + * part of a compaction merge + */ + if (!args->compaction) { + ((DynamicExtension *) args->extension)->advance_epoch(); + } delete args; } @@ -511,6 +561,7 @@ private: // retooling the shard interface a bit to do efficiently. args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count()); args->extension = this; + args->compaction = false; m_sched.schedule_job(merge, 0, args); } |