diff options
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 91 |
1 files changed, 30 insertions, 61 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index cb21ae3..7590de2 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -49,7 +49,7 @@ public: , m_buffer(new Buffer(buffer_lwm, buffer_hwm)) { auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); - auto epoch = new _Epoch(0, vers, m_buffer); + auto epoch = new _Epoch(0, vers, m_buffer, 0); m_versions.insert(vers); m_epochs.insert({0, epoch}); @@ -169,6 +169,15 @@ public: auto vers = epoch->get_structure(); std::vector<Shard *> shards; + + if (vers->get_levels().size() > 0) { + for (int i=vers->get_levels().size() - 1; i>= 0; i--) { + if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) { + shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); + } + } + } + /* * construct a shard from the buffer view. We'll hold the view * for as short a time as possible: once the records are exfiltrated @@ -182,24 +191,7 @@ public: } } - if (vers->get_levels().size() > 0) { - for (int i=vers->get_levels().size() - 1; i>= 0; i--) { - if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) { - shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); - } - } - } - - Shard *shards_array[shards.size()]; - - size_t j = 0; - for (size_t i=0; i<shards.size(); i++) { - if (shards[i]) { - shards_array[j++] = shards[i]; - } - } - - Shard *flattened = new S(shards_array, j); + Shard *flattened = new S(shards); for (auto shard : shards) { delete shard; @@ -302,7 +294,7 @@ private: return m_epochs[cur_epoch]; } - void advance_epoch() { + void advance_epoch(size_t buffer_head) { m_epoch_transition_lk.lock(); @@ -322,26 +314,14 @@ private: */ enforce_delete_invariant(new_epoch); - #if 0 - /* - * Update the new Epoch to contain the buffers from the old one - * that it doesn't currently have if using a multi-threaded - * scheduler (otherwise, there is only one buffer that is - * reused, so it shouldn't be removed) - */ - if constexpr (!std::same_as<SCHED, SerialScheduler>) { - size_t old_buffer_cnt = new_epoch->clear_buffers(); - - /* - * skip the first buffer, as this was flushed into the epoch's - * structure already, and copy all the other buffer references - * into the new epoch - */ - for (size_t i=1; i<old_epoch->get_buffers().size(); i++) { - new_epoch->add_buffer(old_epoch->get_buffers()[i]); - } - } - #endif + // FIXME: this may currently fail because there isn't any + // query preemption yet. At this point, we'd need to either + // 1) wait for all queries on the old_head to finish + // 2) kill all queries on the old_head + // 3) somehow migrate all queries on the old_head to the new + // version + auto res = new_epoch->advance_buffer_head(buffer_head); + assert(res); m_current_epoch.fetch_add(1); old_epoch->set_inactive(); @@ -425,40 +405,29 @@ private: } static void reconstruction(void *arguments) { - ReconstructionArgs<R, S, Q, L> *args = (ReconstructionArgs<R, S, Q, L> *) arguments; - + auto args = (ReconstructionArgs<R, S, Q, L> *) arguments; Structure *vers = args->epoch->get_structure(); - for (ssize_t i=0; i<args->merges.size(); i++) { vers->reconstruction(args->merges[i].second, args->merges[i].first); } - /* - * we'll grab the buffer AFTER doing the internal reconstruction, so we can - * flush as many records as possible in one go. The reconstruction was done so - * as to make room for the full buffer anyway, so there's no real benefit to doing - * this first. + /* + * we'll grab the buffer AFTER doing the internal reconstruction, so we + * can flush as many records as possible in one go. The reconstruction + * was done so as to make room for the full buffer anyway, so there's + * no real benefit to doing this first. */ auto buffer_view = args->epoch->get_buffer(); size_t new_head = buffer_view.get_tail(); - /* - * if performing a compaction, don't flush the buffer, as - * there is no guarantee that any necessary reconstructions + /* + * if performing a compaction, don't flush the buffer, as + * there is no guarantee that any necessary reconstructions * will free sufficient space in L0 to support a flush */ if (!args->compaction) { vers->flush_buffer(std::move(buffer_view)); - - // FIXME: this may currently fail because there isn't any - // query preemption yet. At this point, we'd need to either - // 1) wait for all queries on the old_head to finish - // 2) kill all queries on the old_head - // 3) somehow migrate all queries on the old_head to the new - // version - auto res = args->epoch->advance_buffer_head(new_head); - assert(res); } args->epoch->end_job(); @@ -470,7 +439,7 @@ private: * part of a compaction */ if (!args->compaction) { - ((DynamicExtension *) args->extension)->advance_epoch(); + ((DynamicExtension *) args->extension)->advance_epoch(new_head); } ((DynamicExtension *) args->extension)->m_reconstruction_scheduled = false; |