summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-11-13 11:44:09 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2023-11-13 11:44:09 -0500
commit90bb0614fc1d8f1a185a778e31aaf9027c01aeb8 (patch)
tree22a865978e398cef2fdcef763ab893c2da769a4a /include/framework
parent83486744600e8be338c75c2e3d2339452a392a9d (diff)
downloaddynamic-extension-90bb0614fc1d8f1a185a778e31aaf9027c01aeb8.tar.gz
Tombstone Compaction: re-enabled tombstone compaction
Currently, proactive buffer tombstone compaction is disabled by forcing the buffer tombstone capacity to match its record capacity. It isn't clear how to best handle proactive buffer compactions in an environment where new buffers are spawned anyway.
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h55
-rw-r--r--include/framework/scheduling/Task.h1
-rw-r--r--include/framework/structure/ExtensionStructure.h51
-rw-r--r--include/framework/structure/MutableBuffer.h2
4 files changed, 106 insertions, 3 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 9554c8c..9adc320 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -253,6 +253,32 @@ private:
size_t m_buffer_capacity;
size_t m_buffer_delete_capacity;
+ void enforce_delete_invariant(_Epoch *epoch) {
+ auto structure = epoch->get_structure();
+ auto compactions = structure->get_compaction_tasks();
+
+ while (compactions.size() > 0) {
+ /* otherwise, we need to schedule a merge to compact tombstones */
+ MergeArgs<R, S, Q, L> *args = new MergeArgs<R, S, Q, L>();
+ args->epoch = epoch;
+ // FIXME: all full buffers can be merged at this point--but that requires
+ // retooling the shard interface a bit to do efficiently.
+ args->merges = compactions;
+ args->extension = this;
+ args->compaction = true;
+
+ auto wait = args->result.get_future();
+
+ epoch->start_job();
+ m_sched.schedule_job(merge, 0, args);
+
+ /* wait for merge completion */
+ wait.get();
+
+ compactions = structure->get_compaction_tasks();
+ }
+ }
+
_Epoch *get_active_epoch() {
return m_epochs[m_current_epoch.load()];
}
@@ -272,6 +298,7 @@ private:
}
void advance_epoch() {
+
m_epoch_transition_lk.lock();
size_t new_epoch_num = m_newest_epoch.load();
@@ -282,6 +309,15 @@ private:
_Epoch *old_epoch = m_epochs[old_epoch_num];
/*
+ * Verify the tombstone invariant within the epoch's structure, this
+ * may require scheduling additional merges.
+ *
+ * FIXME: having this inside the lock is going to TANK
+ * insertion performance.
+ */
+ enforce_delete_invariant(new_epoch);
+
+ /*
* Update the new Epoch to contain the buffers from the old one
* that it doesn't currently have if using a multi-threaded
* scheduler (otherwise, there is only one buffer that is
@@ -445,12 +481,26 @@ private:
vers->merge_levels(args->merges[i].second, args->merges[i].first);
}
- vers->merge_buffer(buff);
+ /*
+ * if the merge is a compaction, don't push the buffer down,
+ * as there is no guarantee that the merges will free up
+ * sufficient space in L0
+ */
+ if (!args->compaction) {
+ vers->merge_buffer(buff);
+ }
args->epoch->end_job();
args->result.set_value(true);
- ((DynamicExtension *) args->extension)->advance_epoch();
+ /*
+ * Compactions occur on an epoch _before_ it becomes active,
+ * and as a result the active epoch should _not_ be advanced as
+ * part of a compaction merge
+ */
+ if (!args->compaction) {
+ ((DynamicExtension *) args->extension)->advance_epoch();
+ }
delete args;
}
@@ -511,6 +561,7 @@ private:
// retooling the shard interface a bit to do efficiently.
args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count());
args->extension = this;
+ args->compaction = false;
m_sched.schedule_job(merge, 0, args);
}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index d211fb5..c10ed8b 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -21,6 +21,7 @@ struct MergeArgs {
Epoch<R, S, Q, L> *epoch;
std::vector<MergeTask> merges;
std::promise<bool> result;
+ bool compaction;
void *extension;
};
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 74cede6..a174805 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -201,6 +201,57 @@ public:
return m_levels;
}
+ std::vector<MergeTask> get_compaction_tasks() {
+ std::vector<MergeTask> tasks;
+
+ /* if the tombstone/delete invariant is satisfied, no need for compactions */
+ if (validate_tombstone_proportion()) {
+ return tasks;
+ }
+
+ /* locate the first level to violate the invariant */
+ level_index violation_idx = -1;
+ for (level_index i=0; i<m_levels.size(); i++) {
+ if (!validate_tombstone_proportion(i)) {
+ violation_idx = i;
+ break;
+ }
+ }
+
+ assert(violation_idx != -1);
+
+ level_index merge_base_level = find_mergable_level(violation_idx);
+ if (merge_base_level == -1) {
+ merge_base_level = grow();
+ }
+
+ for (level_index i=merge_base_level; i>0; i--) {
+ MergeTask task = {i-1, i};
+
+ /*
+ * The amount of storage required for the merge accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i-1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_merge_with(i, reccnt)) {
+ reccnt += m_levels[i]->get_record_count();
+ }
+ }
+ //task.m_size = 2* reccnt * sizeof(R);
+
+ tasks.push_back(task);
+ }
+
+ return tasks;
+ }
+
/*
*
*/
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 671824f..8b17091 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -32,7 +32,7 @@ template <RecordInterface R>
class MutableBuffer {
public:
MutableBuffer(size_t capacity, size_t max_tombstone_cap)
- : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
+ : m_cap(capacity), m_tombstone_cap(capacity), m_reccnt(0)
, m_tombstonecnt(0), m_weight(0), m_max_weight(0), m_tail(0) {
m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
m_merge_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));