diff options
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 77 |
1 files changed, 39 insertions, 38 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 8edcc5f..fe43c52 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -165,8 +165,8 @@ public: return m_buffer_capacity; } - Shard *create_static_structure(bool await_merge_completion=false) { - if (await_merge_completion) { + Shard *create_static_structure(bool await_reconstruction_completion=false) { + if (await_reconstruction_completion) { await_next_epoch(); } @@ -179,7 +179,7 @@ public: if (vers->get_levels().size() > 0) { for (int i=vers->get_levels().size() - 1; i>= 0; i--) { if (vers->get_levels()[i]) { - shards.emplace_back(vers->get_levels()[i]->get_merged_shard()); + shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); } } } @@ -261,10 +261,10 @@ private: auto compactions = structure->get_compaction_tasks(); while (compactions.size() > 0) { - /* otherwise, we need to schedule a merge to compact tombstones */ - MergeArgs<R, S, Q, L> *args = new MergeArgs<R, S, Q, L>(); + /* otherwise, we need to schedule a compaction */ + ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>(); args->epoch = epoch; - // FIXME: all full buffers can be merged at this point--but that requires + // FIXME: all full buffers can be flushed at this point--but that requires // retooling the shard interface a bit to do efficiently. args->merges = compactions; args->extension = this; @@ -273,9 +273,9 @@ private: auto wait = args->result.get_future(); epoch->start_job(); - m_sched.schedule_job(merge, 0, args); + m_sched.schedule_job(reconstruction, 0, args); - /* wait for merge completion */ + /* wait for reconstruction completion */ wait.get(); compactions = structure->get_compaction_tasks(); @@ -308,7 +308,7 @@ private: /* * Verify the tombstone invariant within the epoch's structure, this - * may require scheduling additional merges. + * may require scheduling additional reconstructions. * * FIXME: having this inside the lock is going to TANK * insertion performance. @@ -325,8 +325,9 @@ private: size_t old_buffer_cnt = new_epoch->clear_buffers(); /* - * skip the first buffer, as this was the one that got merged, - * and copy all the other buffer references into the new epoch + * skip the first buffer, as this was flushed into the epoch's + * structure already, and copy all the other buffer references + * into the new epoch */ for (size_t i=1; i<old_epoch->get_buffers().size(); i++) { new_epoch->add_buffer(old_epoch->get_buffers()[i]); @@ -352,7 +353,7 @@ private: _Epoch *create_new_epoch() { /* * This epoch access is _not_ protected under the assumption that - * only one merge will be able to trigger at a time. If that condition + * only one reconstruction will be able to trigger at a time. If that condition * is violated, it is possible that this code will clone a retired * epoch. */ @@ -368,7 +369,7 @@ private: /* * Add a new empty buffer. This is intended to be used - * when a merge is triggered, to allow for inserts to be sustained in the new + * when a reconstruction is triggered, to allow for inserts to be sustained in the new * buffer while a new epoch is being created in the background. Returns a * pointer to the newly created buffer. */ @@ -429,13 +430,12 @@ private: */ do { - m_epoch_retire_lk.lock(); if (epoch->retirable()) { break; } - m_epoch_retire_lk.unlock(); } while (true); + m_epoch_retire_lk.lock(); /* remove epoch from the framework's map */ m_epochs.erase(epoch->get_epoch_number()); @@ -473,26 +473,26 @@ private: } } - static void merge(void *arguments) { - MergeArgs<R, S, Q, L> *args = (MergeArgs<R, S, Q, L> *) arguments; + static void reconstruction(void *arguments) { + ReconstructionArgs<R, S, Q, L> *args = (ReconstructionArgs<R, S, Q, L> *) arguments; Structure *vers = args->epoch->get_structure(); // FIXME: with an improved shard interface, multiple full buffers - // could be merged at once here. + // could be flushed at once here. Buffer *buff = (Buffer *) args->epoch->get_buffers()[0]; for (ssize_t i=0; i<args->merges.size(); i++) { - vers->merge_levels(args->merges[i].second, args->merges[i].first); + vers->reconstruction(args->merges[i].second, args->merges[i].first); } /* - * if the merge is a compaction, don't push the buffer down, - * as there is no guarantee that the merges will free up - * sufficient space in L0 + * if performing a compaction, don't push the buffer down, + * as there is no guarantee that any necessary reconstructions + * will free sufficient space in L0 to support a flush */ if (!args->compaction) { - vers->merge_buffer(buff); + vers->flush_buffer(buff); } args->epoch->end_job(); @@ -501,7 +501,7 @@ private: /* * Compactions occur on an epoch _before_ it becomes active, * and as a result the active epoch should _not_ be advanced as - * part of a compaction merge + * part of a compaction */ if (!args->compaction) { ((DynamicExtension *) args->extension)->advance_epoch(); @@ -556,18 +556,19 @@ private: delete args; } - void schedule_merge() { + void schedule_reconstruction() { + //fprintf(stderr, "%ld\t Reconstruction Scheduling", m_current_epoch); auto epoch = create_new_epoch(); epoch->start_job(); - MergeArgs<R, S, Q, L> *args = new MergeArgs<R, S, Q, L>(); + ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>(); args->epoch = epoch; - // FIXME: all full buffers can be merged at this point--but that requires + // FIXME: all full buffers can be flushed at this point--but that requires // retooling the shard interface a bit to do efficiently. - args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count()); + args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffers()[0]->get_record_count()); args->extension = this; args->compaction = false; - m_sched.schedule_job(merge, 0, args); + m_sched.schedule_job(reconstruction, 0, args); } std::future<std::vector<R>> schedule_query(void *query_parms) { @@ -592,27 +593,27 @@ private: assert(buffer); /* - * If the buffer is full and there is no current merge, - * schedule a merge and add a new empty buffer. If there - * is a current merge, then just add a new empty buffer + * If the buffer is full and there is no ongoing reconstruction, + * schedule a reconstruction and add a new empty buffer. If there + * is an ongoing reconstruction, then add a new empty buffer * to the current epoch. */ if (buffer->is_full()) { if constexpr (std::same_as<SCHED, SerialScheduler>) { - /* single threaded: run merge and then empty buffer */ + /* single threaded: run reconstruction and then empty buffer */ epoch->end_job(); - schedule_merge(); + schedule_reconstruction(); buffer->truncate(); continue; - } else if (epoch->prepare_merge()) { + } else if (epoch->prepare_reconstruction()) { /* * add an empty buffer to allow insert proceed and - * schedule a merge on a background thread + * schedule a reconstruction on a background thread */ buffer = add_empty_buffer(); - schedule_merge(); + schedule_reconstruction(); } else { - /* background merge is ongoing, so just add empty buffer */ + /* background reconstruction is ongoing, so just add empty buffer */ buffer = add_empty_buffer(); } } |