summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-01-15 17:23:57 -0500
committerDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-01-15 17:23:57 -0500
commit2117935e85412f3733ee0bcb1830c7fd0b129b29 (patch)
tree303d4482263f25a45b157fef82217084db20be8d /include/framework/DynamicExtension.h
parent0a9e79416df03a9e0a3d2cf171cf90028a644d6d (diff)
downloaddynamic-extension-2117935e85412f3733ee0bcb1830c7fd0b129b29.tar.gz
Concurrency testing and bug fixes
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h21
1 files changed, 15 insertions, 6 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index bddc950..cb21ae3 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -28,6 +28,7 @@
#include "framework/scheduling/Epoch.h"
+
namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING,
@@ -176,12 +177,14 @@ public:
*/
{
auto bv = epoch->get_buffer();
- shards.emplace_back(new S(std::move(bv)));
+ if (bv.get_record_count() > 0) {
+ shards.emplace_back(new S(std::move(bv)));
+ }
}
if (vers->get_levels().size() > 0) {
for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
- if (vers->get_levels()[i]) {
+ if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) {
shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
}
}
@@ -426,15 +429,21 @@ private:
Structure *vers = args->epoch->get_structure();
- // could be flushed at once here.
- auto buffer_view = args->epoch->get_flush_buffer();
- size_t new_head = buffer_view.get_tail();
for (ssize_t i=0; i<args->merges.size(); i++) {
vers->reconstruction(args->merges[i].second, args->merges[i].first);
}
/*
+ * we'll grab the buffer AFTER doing the internal reconstruction, so we can
+ * flush as many records as possible in one go. The reconstruction was done so
+ * as to make room for the full buffer anyway, so there's no real benefit to doing
+ * this first.
+ */
+ auto buffer_view = args->epoch->get_buffer();
+ size_t new_head = buffer_view.get_tail();
+
+ /*
* if performing a compaction, don't flush the buffer, as
* there is no guarantee that any necessary reconstructions
* will free sufficient space in L0 to support a flush
@@ -528,7 +537,7 @@ private:
ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>();
args->epoch = epoch;
- args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_low_watermark());
+ args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_high_watermark());
args->extension = this;
args->compaction = false;
/* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */