summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-12-13 12:39:54 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2023-12-13 12:39:54 -0500
commit3c127eda69295cb306739bdd3c5ddccff6026a8d (patch)
tree43632849c7684cab68c43a8eb2c0aeac7adffad7 /include/framework/DynamicExtension.h
parentd1f3535404ec2c200dcf2628b8c5c1f92b39e797 (diff)
downloaddynamic-extension-3c127eda69295cb306739bdd3c5ddccff6026a8d.tar.gz
Refactoring: corrected a number of names and added more comments
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h77
1 files changed, 39 insertions, 38 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 8edcc5f..fe43c52 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -165,8 +165,8 @@ public:
return m_buffer_capacity;
}
- Shard *create_static_structure(bool await_merge_completion=false) {
- if (await_merge_completion) {
+ Shard *create_static_structure(bool await_reconstruction_completion=false) {
+ if (await_reconstruction_completion) {
await_next_epoch();
}
@@ -179,7 +179,7 @@ public:
if (vers->get_levels().size() > 0) {
for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
if (vers->get_levels()[i]) {
- shards.emplace_back(vers->get_levels()[i]->get_merged_shard());
+ shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
}
}
}
@@ -261,10 +261,10 @@ private:
auto compactions = structure->get_compaction_tasks();
while (compactions.size() > 0) {
- /* otherwise, we need to schedule a merge to compact tombstones */
- MergeArgs<R, S, Q, L> *args = new MergeArgs<R, S, Q, L>();
+ /* otherwise, we need to schedule a compaction */
+ ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>();
args->epoch = epoch;
- // FIXME: all full buffers can be merged at this point--but that requires
+ // FIXME: all full buffers can be flushed at this point--but that requires
// retooling the shard interface a bit to do efficiently.
args->merges = compactions;
args->extension = this;
@@ -273,9 +273,9 @@ private:
auto wait = args->result.get_future();
epoch->start_job();
- m_sched.schedule_job(merge, 0, args);
+ m_sched.schedule_job(reconstruction, 0, args);
- /* wait for merge completion */
+ /* wait for reconstruction completion */
wait.get();
compactions = structure->get_compaction_tasks();
@@ -308,7 +308,7 @@ private:
/*
* Verify the tombstone invariant within the epoch's structure, this
- * may require scheduling additional merges.
+ * may require scheduling additional reconstructions.
*
* FIXME: having this inside the lock is going to TANK
* insertion performance.
@@ -325,8 +325,9 @@ private:
size_t old_buffer_cnt = new_epoch->clear_buffers();
/*
- * skip the first buffer, as this was the one that got merged,
- * and copy all the other buffer references into the new epoch
+ * skip the first buffer, as this was flushed into the epoch's
+ * structure already, and copy all the other buffer references
+ * into the new epoch
*/
for (size_t i=1; i<old_epoch->get_buffers().size(); i++) {
new_epoch->add_buffer(old_epoch->get_buffers()[i]);
@@ -352,7 +353,7 @@ private:
_Epoch *create_new_epoch() {
/*
* This epoch access is _not_ protected under the assumption that
- * only one merge will be able to trigger at a time. If that condition
+ * only one reconstruction will be able to trigger at a time. If that condition
* is violated, it is possible that this code will clone a retired
* epoch.
*/
@@ -368,7 +369,7 @@ private:
/*
* Add a new empty buffer. This is intended to be used
- * when a merge is triggered, to allow for inserts to be sustained in the new
+ * when a reconstruction is triggered, to allow for inserts to be sustained in the new
* buffer while a new epoch is being created in the background. Returns a
* pointer to the newly created buffer.
*/
@@ -429,13 +430,12 @@ private:
*/
do {
- m_epoch_retire_lk.lock();
if (epoch->retirable()) {
break;
}
- m_epoch_retire_lk.unlock();
} while (true);
+ m_epoch_retire_lk.lock();
/* remove epoch from the framework's map */
m_epochs.erase(epoch->get_epoch_number());
@@ -473,26 +473,26 @@ private:
}
}
- static void merge(void *arguments) {
- MergeArgs<R, S, Q, L> *args = (MergeArgs<R, S, Q, L> *) arguments;
+ static void reconstruction(void *arguments) {
+ ReconstructionArgs<R, S, Q, L> *args = (ReconstructionArgs<R, S, Q, L> *) arguments;
Structure *vers = args->epoch->get_structure();
// FIXME: with an improved shard interface, multiple full buffers
- // could be merged at once here.
+ // could be flushed at once here.
Buffer *buff = (Buffer *) args->epoch->get_buffers()[0];
for (ssize_t i=0; i<args->merges.size(); i++) {
- vers->merge_levels(args->merges[i].second, args->merges[i].first);
+ vers->reconstruction(args->merges[i].second, args->merges[i].first);
}
/*
- * if the merge is a compaction, don't push the buffer down,
- * as there is no guarantee that the merges will free up
- * sufficient space in L0
+ * if performing a compaction, don't push the buffer down,
+ * as there is no guarantee that any necessary reconstructions
+ * will free sufficient space in L0 to support a flush
*/
if (!args->compaction) {
- vers->merge_buffer(buff);
+ vers->flush_buffer(buff);
}
args->epoch->end_job();
@@ -501,7 +501,7 @@ private:
/*
* Compactions occur on an epoch _before_ it becomes active,
* and as a result the active epoch should _not_ be advanced as
- * part of a compaction merge
+ * part of a compaction
*/
if (!args->compaction) {
((DynamicExtension *) args->extension)->advance_epoch();
@@ -556,18 +556,19 @@ private:
delete args;
}
- void schedule_merge() {
+ void schedule_reconstruction() {
+ //fprintf(stderr, "%ld\t Reconstruction Scheduling", m_current_epoch);
auto epoch = create_new_epoch();
epoch->start_job();
- MergeArgs<R, S, Q, L> *args = new MergeArgs<R, S, Q, L>();
+ ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>();
args->epoch = epoch;
- // FIXME: all full buffers can be merged at this point--but that requires
+ // FIXME: all full buffers can be flushed at this point--but that requires
// retooling the shard interface a bit to do efficiently.
- args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count());
+ args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffers()[0]->get_record_count());
args->extension = this;
args->compaction = false;
- m_sched.schedule_job(merge, 0, args);
+ m_sched.schedule_job(reconstruction, 0, args);
}
std::future<std::vector<R>> schedule_query(void *query_parms) {
@@ -592,27 +593,27 @@ private:
assert(buffer);
/*
- * If the buffer is full and there is no current merge,
- * schedule a merge and add a new empty buffer. If there
- * is a current merge, then just add a new empty buffer
+ * If the buffer is full and there is no ongoing reconstruction,
+ * schedule a reconstruction and add a new empty buffer. If there
+ * is an ongoing reconstruction, then add a new empty buffer
* to the current epoch.
*/
if (buffer->is_full()) {
if constexpr (std::same_as<SCHED, SerialScheduler>) {
- /* single threaded: run merge and then empty buffer */
+ /* single threaded: run reconstruction and then empty buffer */
epoch->end_job();
- schedule_merge();
+ schedule_reconstruction();
buffer->truncate();
continue;
- } else if (epoch->prepare_merge()) {
+ } else if (epoch->prepare_reconstruction()) {
/*
* add an empty buffer to allow insert proceed and
- * schedule a merge on a background thread
+ * schedule a reconstruction on a background thread
*/
buffer = add_empty_buffer();
- schedule_merge();
+ schedule_reconstruction();
} else {
- /* background merge is ongoing, so just add empty buffer */
+ /* background reconstruction is ongoing, so just add empty buffer */
buffer = add_empty_buffer();
}
}