From 85afe4ef04f327862460570fb0aa4c30afcf7cc7 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 11 Feb 2025 17:32:10 -0500 Subject: Progress: began adding parallel merging and locking of levels --- include/framework/structure/ExtensionStructure.h | 19 +++++++++++++++---- include/framework/structure/MutableBuffer.h | 1 + 2 files changed, 16 insertions(+), 4 deletions(-) (limited to 'include/framework/structure') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 2bf7086..fa713af 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -159,7 +159,12 @@ public: return cnt; } - inline void perform_reconstruction(ReconstructionTask task, size_t version=0) { + /* + * Perform the reconstruction described by task. If the resulting + * reconstruction grows the structure (i.e., adds a level), returns + * true. Otherwise, returns false. + */ + inline bool perform_reconstruction(ReconstructionTask task, size_t version=0) { /* perform the reconstruction itself */ std::vector shards; for (ShardID shid : task.sources) { @@ -205,10 +210,12 @@ public: */ if (task.target < (level_index)m_levels.size()) { m_levels[task.target]->append(std::shared_ptr(new_shard), version); + return false; // fprintf(stderr, "append (no growth)\n"); } else { /* grow the structure if needed */ m_levels.push_back(std::make_shared>(task.target)); m_levels[task.target]->append(std::shared_ptr(new_shard), version); + return true; // fprintf(stderr, "grow and append\n"); } } @@ -283,9 +290,13 @@ public: assert(version_id > 0); for (size_t i=0; im_levels.size(); i++) { - for (size_t j=0; jm_levels[i]->get_shard_count(); j++) { - if (old_structure->m_levels[i]->get_shard_version(j) > version_id) { - m_levels[i]->append(old_structure->m_levels[i]->get_shard_ptr(j)); + if (m_levels.size() <= i) { + m_levels.push_back(old_structure->m_levels[i]); + } else { + for (size_t j=0; jm_levels[i]->get_shard_count(); j++) { + if (old_structure->m_levels[i]->get_shard_version(j) > version_id) { + m_levels[i]->append(old_structure->m_levels[i]->get_shard_ptr(j)); + } } } } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 0eae73d..0197ecd 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -104,6 +104,7 @@ public: bool is_full() const { return get_record_count() >= m_hwm; } bool is_at_low_watermark() const { return get_record_count() >= m_lwm; } + bool is_at_high_watermark() const { return get_record_count() >= m_hwm; } size_t get_tombstone_count() const { return m_tscnt.load(); } -- cgit v1.2.3