From 77589d4cc82b766d2cf16294fab98a57f6579cb4 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 16 Jan 2025 13:18:37 -0500 Subject: Additional layout policies + more flexibility in buffer flushing --- include/framework/structure/ExtensionStructure.h | 95 ++++++++++++++++-------- include/framework/structure/InternalLevel.h | 19 +++++ 2 files changed, 85 insertions(+), 29 deletions(-) (limited to 'include/framework/structure') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 3bb8a0b..078c4a9 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -145,17 +145,32 @@ public: return cnt; } - inline void perform_reconstruction(ReconstructionTask task) { + size_t get_shard_count() const { + size_t cnt = 0; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + cnt += m_levels[i]->get_nonempty_shard_count(); + } + } + + return cnt; + } + + inline void perform_reconstruction(ReconstructionTask task, + BuffView *bv=nullptr) { /* perform the reconstruction itself */ std::vector shards; for (ShardID shid : task.sources) { assert(shid.level_idx < (level_index) m_levels.size()); assert(shid.shard_idx >= -1); - /* if unspecified, push all shards into the vector */ - if (shid.shard_idx == all_shards_idx) { - for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); - i++) { + if (shid == buffer_shid) { + assert(bv); + ShardType *buffer_shard = new ShardType(std::move(*bv)); + shards.push_back(buffer_shard); + } else if (shid.shard_idx == all_shards_idx) { + /* if unspecified, push all shards into the vector */ + for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) { if (m_levels[shid.level_idx]->get_shard(i)) { shards.push_back(m_levels[shid.level_idx]->get_shard(i)); } @@ -171,7 +186,9 @@ public: * Remove all of the shards processed by the operation */ for (ShardID shid : task.sources) { - if (shid.shard_idx == all_shards_idx) { + if (shid == buffer_shid) { + continue; + } else if (shid.shard_idx == all_shards_idx) { m_levels[shid.level_idx]->truncate(); } else { m_levels[shid.level_idx]->delete_shard(shid.shard_idx); @@ -199,29 +216,49 @@ public: * like that, we'll leave this as low priority. */ - /* insert the first level, if needed */ - if (m_levels.size() == 0) { - m_levels.push_back( - std::make_shared>(0)); - } - - ShardType *buffer_shard = new ShardType(std::move(buffer)); - if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) { - m_levels[0]->append(std::shared_ptr(buffer_shard)); - } else { - std::vector shards; - for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count(); - i++) { - if (m_levels[0]->get_shard(i)) { - shards.push_back(m_levels[0]->get_shard(i)); - } - - shards.push_back(buffer_shard); - ShardType *new_shard = new ShardType(shards); - m_levels[0]->truncate(); - m_levels[0]->append(std::shared_ptr(new_shard)); - } - } + // /* insert the first level, if needed */ + // if (m_levels.size() == 0) { + // m_levels.push_back( + // std::make_shared>(0)); + // } + + perform_reconstruction(task, &buffer); + + // ShardType *buffer_shard = new ShardType(std::move(buffer)); + // if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) { + // m_levels[0]->append(std::shared_ptr(buffer_shard)); + // } else if (task.type == ReconstructionType::Merge) { + // std::vector shards; + // for (size_t i=0; iget_shard(shid.shard_idx)); + // } + // } + + // shards.emplace_back(buffer_shard); + // ShardType *new_shard = new ShardType(shards); + // m_levels[0]->append(std::shared_ptr(new_shard)); + // for (size_t i=0; idelete_shard(shid.shard_idx); + // } + // } + // } else { + // std::vector shards; + // for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count(); + // i++) { + // if (m_levels[0]->get_shard(i)) { + // shards.push_back(m_levels[0]->get_shard(i)); + // } + + // shards.push_back(buffer_shard); + // ShardType *new_shard = new ShardType(shards); + // m_levels[0]->truncate(); + // m_levels[0]->append(std::shared_ptr(new_shard)); + // } + // } } bool take_reference() { diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index c9d1749..5bc891b 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -166,6 +166,17 @@ public: return (double)tscnt / (double)(tscnt + reccnt); } + size_t get_nonempty_shard_count() const { + size_t cnt = 0; + for (size_t i = 0; i < m_shards.size(); i++) { + if (m_shards[i] && m_shards[i]->get_record_count() > 0) { + cnt += 1; + } + } + + return cnt; + } + std::shared_ptr clone() const { auto new_level = std::make_shared(m_level_no); for (size_t i = 0; i < m_shards.size(); i++) { @@ -185,6 +196,14 @@ public: m_shards.emplace_back(shard); } + const ShardType *get_shard(ShardID shid) const { + if (shid < m_shards.size()) { + return m_shards[shid].get(); + } + + return nullptr; + } + private: ssize_t m_level_no; std::vector> m_shards; -- cgit v1.2.3