From 2117935e85412f3733ee0bcb1830c7fd0b129b29 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Mon, 15 Jan 2024 17:23:57 -0500 Subject: Concurrency testing and bug fixes --- include/framework/DynamicExtension.h | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index bddc950..cb21ae3 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -28,6 +28,7 @@ #include "framework/scheduling/Epoch.h" + namespace de { template get_buffer(); - shards.emplace_back(new S(std::move(bv))); + if (bv.get_record_count() > 0) { + shards.emplace_back(new S(std::move(bv))); + } } if (vers->get_levels().size() > 0) { for (int i=vers->get_levels().size() - 1; i>= 0; i--) { - if (vers->get_levels()[i]) { + if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) { shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); } } @@ -426,14 +429,20 @@ private: Structure *vers = args->epoch->get_structure(); - // could be flushed at once here. - auto buffer_view = args->epoch->get_flush_buffer(); - size_t new_head = buffer_view.get_tail(); for (ssize_t i=0; imerges.size(); i++) { vers->reconstruction(args->merges[i].second, args->merges[i].first); } + /* + * we'll grab the buffer AFTER doing the internal reconstruction, so we can + * flush as many records as possible in one go. The reconstruction was done so + * as to make room for the full buffer anyway, so there's no real benefit to doing + * this first. + */ + auto buffer_view = args->epoch->get_buffer(); + size_t new_head = buffer_view.get_tail(); + /* * if performing a compaction, don't flush the buffer, as * there is no guarantee that any necessary reconstructions @@ -528,7 +537,7 @@ private: ReconstructionArgs *args = new ReconstructionArgs(); args->epoch = epoch; - args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_low_watermark()); + args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_high_watermark()); args->extension = this; args->compaction = false; /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ -- cgit v1.2.3