summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h55
1 files changed, 53 insertions, 2 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 9554c8c..9adc320 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -253,6 +253,32 @@ private:
size_t m_buffer_capacity;
size_t m_buffer_delete_capacity;
+ void enforce_delete_invariant(_Epoch *epoch) {
+ auto structure = epoch->get_structure();
+ auto compactions = structure->get_compaction_tasks();
+
+ while (compactions.size() > 0) {
+ /* otherwise, we need to schedule a merge to compact tombstones */
+ MergeArgs<R, S, Q, L> *args = new MergeArgs<R, S, Q, L>();
+ args->epoch = epoch;
+ // FIXME: all full buffers can be merged at this point--but that requires
+ // retooling the shard interface a bit to do efficiently.
+ args->merges = compactions;
+ args->extension = this;
+ args->compaction = true;
+
+ auto wait = args->result.get_future();
+
+ epoch->start_job();
+ m_sched.schedule_job(merge, 0, args);
+
+ /* wait for merge completion */
+ wait.get();
+
+ compactions = structure->get_compaction_tasks();
+ }
+ }
+
_Epoch *get_active_epoch() {
return m_epochs[m_current_epoch.load()];
}
@@ -272,6 +298,7 @@ private:
}
void advance_epoch() {
+
m_epoch_transition_lk.lock();
size_t new_epoch_num = m_newest_epoch.load();
@@ -282,6 +309,15 @@ private:
_Epoch *old_epoch = m_epochs[old_epoch_num];
/*
+ * Verify the tombstone invariant within the epoch's structure, this
+ * may require scheduling additional merges.
+ *
+ * FIXME: having this inside the lock is going to TANK
+ * insertion performance.
+ */
+ enforce_delete_invariant(new_epoch);
+
+ /*
* Update the new Epoch to contain the buffers from the old one
* that it doesn't currently have if using a multi-threaded
* scheduler (otherwise, there is only one buffer that is
@@ -445,12 +481,26 @@ private:
vers->merge_levels(args->merges[i].second, args->merges[i].first);
}
- vers->merge_buffer(buff);
+ /*
+ * if the merge is a compaction, don't push the buffer down,
+ * as there is no guarantee that the merges will free up
+ * sufficient space in L0
+ */
+ if (!args->compaction) {
+ vers->merge_buffer(buff);
+ }
args->epoch->end_job();
args->result.set_value(true);
- ((DynamicExtension *) args->extension)->advance_epoch();
+ /*
+ * Compactions occur on an epoch _before_ it becomes active,
+ * and as a result the active epoch should _not_ be advanced as
+ * part of a compaction merge
+ */
+ if (!args->compaction) {
+ ((DynamicExtension *) args->extension)->advance_epoch();
+ }
delete args;
}
@@ -511,6 +561,7 @@ private:
// retooling the shard interface a bit to do efficiently.
args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count());
args->extension = this;
+ args->compaction = false;
m_sched.schedule_job(merge, 0, args);
}