diff options
| author | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-01-15 17:23:57 -0500 |
|---|---|---|
| committer | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-01-15 17:23:57 -0500 |
| commit | 2117935e85412f3733ee0bcb1830c7fd0b129b29 (patch) | |
| tree | 303d4482263f25a45b157fef82217084db20be8d /include/framework | |
| parent | 0a9e79416df03a9e0a3d2cf171cf90028a644d6d (diff) | |
| download | dynamic-extension-2117935e85412f3733ee0bcb1830c7fd0b129b29.tar.gz | |
Concurrency testing and bug fixes
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 21 | ||||
| -rw-r--r-- | include/framework/structure/BufferView.h | 10 |
2 files changed, 24 insertions, 7 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index bddc950..cb21ae3 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -28,6 +28,7 @@ #include "framework/scheduling/Epoch.h" + namespace de { template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, @@ -176,12 +177,14 @@ public: */ { auto bv = epoch->get_buffer(); - shards.emplace_back(new S(std::move(bv))); + if (bv.get_record_count() > 0) { + shards.emplace_back(new S(std::move(bv))); + } } if (vers->get_levels().size() > 0) { for (int i=vers->get_levels().size() - 1; i>= 0; i--) { - if (vers->get_levels()[i]) { + if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) { shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); } } @@ -426,15 +429,21 @@ private: Structure *vers = args->epoch->get_structure(); - // could be flushed at once here. - auto buffer_view = args->epoch->get_flush_buffer(); - size_t new_head = buffer_view.get_tail(); for (ssize_t i=0; i<args->merges.size(); i++) { vers->reconstruction(args->merges[i].second, args->merges[i].first); } /* + * we'll grab the buffer AFTER doing the internal reconstruction, so we can + * flush as many records as possible in one go. The reconstruction was done so + * as to make room for the full buffer anyway, so there's no real benefit to doing + * this first. + */ + auto buffer_view = args->epoch->get_buffer(); + size_t new_head = buffer_view.get_tail(); + + /* * if performing a compaction, don't flush the buffer, as * there is no guarantee that any necessary reconstructions * will free sufficient space in L0 to support a flush @@ -528,7 +537,7 @@ private: ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>(); args->epoch = epoch; - args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_low_watermark()); + args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_high_watermark()); args->extension = this; args->compaction = false; /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index c751786..099b7a2 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -105,7 +105,15 @@ public: } void copy_to_buffer(psudb::byte *buffer) { - memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped<R>)); + /* check if the region to be copied circles back to start. If so, do it in two steps */ + if ((m_head % m_cap) + get_record_count() > m_cap) { + size_t split_idx = m_cap - (m_head % m_cap); + + memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), split_idx* sizeof(Wrapped<R>)); + memcpy(buffer + split_idx, (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped<R>)); + } else { + memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped<R>)); + } } size_t get_tail() { |