summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-01-15 17:23:57 -0500
committerDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-01-15 17:23:57 -0500
commit2117935e85412f3733ee0bcb1830c7fd0b129b29 (patch)
tree303d4482263f25a45b157fef82217084db20be8d /include/framework
parent0a9e79416df03a9e0a3d2cf171cf90028a644d6d (diff)
downloaddynamic-extension-2117935e85412f3733ee0bcb1830c7fd0b129b29.tar.gz
Concurrency testing and bug fixes
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h21
-rw-r--r--include/framework/structure/BufferView.h10
2 files changed, 24 insertions, 7 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index bddc950..cb21ae3 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -28,6 +28,7 @@
#include "framework/scheduling/Epoch.h"
+
namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING,
@@ -176,12 +177,14 @@ public:
*/
{
auto bv = epoch->get_buffer();
- shards.emplace_back(new S(std::move(bv)));
+ if (bv.get_record_count() > 0) {
+ shards.emplace_back(new S(std::move(bv)));
+ }
}
if (vers->get_levels().size() > 0) {
for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
- if (vers->get_levels()[i]) {
+ if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) {
shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
}
}
@@ -426,15 +429,21 @@ private:
Structure *vers = args->epoch->get_structure();
- // could be flushed at once here.
- auto buffer_view = args->epoch->get_flush_buffer();
- size_t new_head = buffer_view.get_tail();
for (ssize_t i=0; i<args->merges.size(); i++) {
vers->reconstruction(args->merges[i].second, args->merges[i].first);
}
/*
+ * we'll grab the buffer AFTER doing the internal reconstruction, so we can
+ * flush as many records as possible in one go. The reconstruction was done so
+ * as to make room for the full buffer anyway, so there's no real benefit to doing
+ * this first.
+ */
+ auto buffer_view = args->epoch->get_buffer();
+ size_t new_head = buffer_view.get_tail();
+
+ /*
* if performing a compaction, don't flush the buffer, as
* there is no guarantee that any necessary reconstructions
* will free sufficient space in L0 to support a flush
@@ -528,7 +537,7 @@ private:
ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>();
args->epoch = epoch;
- args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_low_watermark());
+ args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_high_watermark());
args->extension = this;
args->compaction = false;
/* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index c751786..099b7a2 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -105,7 +105,15 @@ public:
}
void copy_to_buffer(psudb::byte *buffer) {
- memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped<R>));
+ /* check if the region to be copied circles back to start. If so, do it in two steps */
+ if ((m_head % m_cap) + get_record_count() > m_cap) {
+ size_t split_idx = m_cap - (m_head % m_cap);
+
+ memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), split_idx* sizeof(Wrapped<R>));
+ memcpy(buffer + split_idx, (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped<R>));
+ } else {
+ memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped<R>));
+ }
}
size_t get_tail() {