summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h55
-rw-r--r--include/framework/scheduling/Task.h1
-rw-r--r--include/framework/structure/ExtensionStructure.h51
-rw-r--r--include/framework/structure/MutableBuffer.h2
4 files changed, 106 insertions, 3 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 9554c8c..9adc320 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -253,6 +253,32 @@ private:
size_t m_buffer_capacity;
size_t m_buffer_delete_capacity;
+ void enforce_delete_invariant(_Epoch *epoch) {
+ auto structure = epoch->get_structure();
+ auto compactions = structure->get_compaction_tasks();
+
+ while (compactions.size() > 0) {
+ /* otherwise, we need to schedule a merge to compact tombstones */
+ MergeArgs<R, S, Q, L> *args = new MergeArgs<R, S, Q, L>();
+ args->epoch = epoch;
+ // FIXME: all full buffers can be merged at this point--but that requires
+ // retooling the shard interface a bit to do efficiently.
+ args->merges = compactions;
+ args->extension = this;
+ args->compaction = true;
+
+ auto wait = args->result.get_future();
+
+ epoch->start_job();
+ m_sched.schedule_job(merge, 0, args);
+
+ /* wait for merge completion */
+ wait.get();
+
+ compactions = structure->get_compaction_tasks();
+ }
+ }
+
_Epoch *get_active_epoch() {
return m_epochs[m_current_epoch.load()];
}
@@ -272,6 +298,7 @@ private:
}
void advance_epoch() {
+
m_epoch_transition_lk.lock();
size_t new_epoch_num = m_newest_epoch.load();
@@ -282,6 +309,15 @@ private:
_Epoch *old_epoch = m_epochs[old_epoch_num];
/*
+ * Verify the tombstone invariant within the epoch's structure, this
+ * may require scheduling additional merges.
+ *
+ * FIXME: having this inside the lock is going to TANK
+ * insertion performance.
+ */
+ enforce_delete_invariant(new_epoch);
+
+ /*
* Update the new Epoch to contain the buffers from the old one
* that it doesn't currently have if using a multi-threaded
* scheduler (otherwise, there is only one buffer that is
@@ -445,12 +481,26 @@ private:
vers->merge_levels(args->merges[i].second, args->merges[i].first);
}
- vers->merge_buffer(buff);
+ /*
+ * if the merge is a compaction, don't push the buffer down,
+ * as there is no guarantee that the merges will free up
+ * sufficient space in L0
+ */
+ if (!args->compaction) {
+ vers->merge_buffer(buff);
+ }
args->epoch->end_job();
args->result.set_value(true);
- ((DynamicExtension *) args->extension)->advance_epoch();
+ /*
+ * Compactions occur on an epoch _before_ it becomes active,
+ * and as a result the active epoch should _not_ be advanced as
+ * part of a compaction merge
+ */
+ if (!args->compaction) {
+ ((DynamicExtension *) args->extension)->advance_epoch();
+ }
delete args;
}
@@ -511,6 +561,7 @@ private:
// retooling the shard interface a bit to do efficiently.
args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count());
args->extension = this;
+ args->compaction = false;
m_sched.schedule_job(merge, 0, args);
}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index d211fb5..c10ed8b 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -21,6 +21,7 @@ struct MergeArgs {
Epoch<R, S, Q, L> *epoch;
std::vector<MergeTask> merges;
std::promise<bool> result;
+ bool compaction;
void *extension;
};
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 74cede6..a174805 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -201,6 +201,57 @@ public:
return m_levels;
}
+ std::vector<MergeTask> get_compaction_tasks() {
+ std::vector<MergeTask> tasks;
+
+ /* if the tombstone/delete invariant is satisfied, no need for compactions */
+ if (validate_tombstone_proportion()) {
+ return tasks;
+ }
+
+ /* locate the first level to violate the invariant */
+ level_index violation_idx = -1;
+ for (level_index i=0; i<m_levels.size(); i++) {
+ if (!validate_tombstone_proportion(i)) {
+ violation_idx = i;
+ break;
+ }
+ }
+
+ assert(violation_idx != -1);
+
+ level_index merge_base_level = find_mergable_level(violation_idx);
+ if (merge_base_level == -1) {
+ merge_base_level = grow();
+ }
+
+ for (level_index i=merge_base_level; i>0; i--) {
+ MergeTask task = {i-1, i};
+
+ /*
+ * The amount of storage required for the merge accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i-1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_merge_with(i, reccnt)) {
+ reccnt += m_levels[i]->get_record_count();
+ }
+ }
+ //task.m_size = 2* reccnt * sizeof(R);
+
+ tasks.push_back(task);
+ }
+
+ return tasks;
+ }
+
/*
*
*/
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 671824f..8b17091 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -32,7 +32,7 @@ template <RecordInterface R>
class MutableBuffer {
public:
MutableBuffer(size_t capacity, size_t max_tombstone_cap)
- : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
+ : m_cap(capacity), m_tombstone_cap(capacity), m_reccnt(0)
, m_tombstonecnt(0), m_weight(0), m_max_weight(0), m_tail(0) {
m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
m_merge_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));