From fd0e99e618319974320ed3fb49535aec501be1fb Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 6 Feb 2025 15:56:33 -0500 Subject: Background compaction stuff --- include/framework/structure/ExtensionStructure.h | 72 ++++++++++++++++++---- include/framework/structure/InternalLevel.h | 76 ++++++++++++++++-------- 2 files changed, 114 insertions(+), 34 deletions(-) (limited to 'include/framework/structure') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 60fb6c7..2bf7086 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -27,8 +27,9 @@ class ExtensionStructure { typedef std::vector>> LevelVector; public: - ExtensionStructure() { - m_levels.emplace_back(std::make_shared>(0)); + ExtensionStructure(bool default_level=true) { + if (default_level) + m_levels.emplace_back(std::make_shared>(0)); } ~ExtensionStructure() = default; @@ -49,7 +50,7 @@ public: * need to be forwarded to the appropriate structures manually. */ ExtensionStructure *copy() const { - auto new_struct = new ExtensionStructure(); + auto new_struct = new ExtensionStructure(false); for (size_t i = 0; i < m_levels.size(); i++) { new_struct->m_levels.push_back(m_levels[i]->clone()); } @@ -158,13 +159,17 @@ public: return cnt; } - inline void perform_reconstruction(ReconstructionTask task) { + inline void perform_reconstruction(ReconstructionTask task, size_t version=0) { /* perform the reconstruction itself */ std::vector shards; for (ShardID shid : task.sources) { - assert(shid.level_idx < (level_index) m_levels.size()); + assert(shid.level_idx <= (level_index) m_levels.size()); assert(shid.shard_idx >= -1); + if (shid.level_idx == (level_index) m_levels.size()) { + continue; + } + /* if unspecified, push all shards into the vector */ if (shid.shard_idx == all_shards_idx) { for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); @@ -184,21 +189,27 @@ public: * Remove all of the shards processed by the operation */ for (ShardID shid : task.sources) { - if (shid.shard_idx == all_shards_idx) { + if (shid.level_idx == (level_index) m_levels.size()) { + continue; + } else if (shid.shard_idx == all_shards_idx) { m_levels[shid.level_idx]->truncate(); } else if (shid != buffer_shid) { m_levels[shid.level_idx]->delete_shard(shid.shard_idx); } } + // fprintf(stderr, "Target: %ld\tLevels:%ld\n", task.target, m_levels.size()); + /* * Append the new shard to the target level */ if (task.target < (level_index)m_levels.size()) { - m_levels[task.target]->append(std::shared_ptr(new_shard)); + m_levels[task.target]->append(std::shared_ptr(new_shard), version); + // fprintf(stderr, "append (no growth)\n"); } else { /* grow the structure if needed */ m_levels.push_back(std::make_shared>(task.target)); - m_levels[task.target]->append(std::shared_ptr(new_shard)); + m_levels[task.target]->append(std::shared_ptr(new_shard), version); + // fprintf(stderr, "grow and append\n"); } } @@ -219,8 +230,8 @@ public: return m_levels[0]->get_shard_count(); } - void append_l0(std::shared_ptr shard) { - m_levels[0]->append(shard); + void append_l0(std::shared_ptr shard, size_t version) { + m_levels[0]->append(shard, version); } LevelVector const &get_level_vector() const { return m_levels; } @@ -251,6 +262,47 @@ public: return ts_prop <= (long double) max_delete_prop; } + void print_structure() const { + for (size_t i=0; iget_shard_count(); j++) { + fprintf(stdout, "(%ld: %ld) ", j, m_levels[i]->get_shard(j)->get_record_count()); + } + } else { + fprintf(stdout, "[Empty]"); + } + + fprintf(stdout, "\n"); + } + } + + + void merge_structure(const ExtensionStructure* old_structure, size_t version_id = 0) { + assert(version_id > 0); + + for (size_t i=0; im_levels.size(); i++) { + for (size_t j=0; jm_levels[i]->get_shard_count(); j++) { + if (old_structure->m_levels[i]->get_shard_version(j) > version_id) { + m_levels[i]->append(old_structure->m_levels[i]->get_shard_ptr(j)); + } + } + } + } + + void update_shard_version(size_t version) { + assert(version != 0); + + for (size_t i=0; iget_shard_count(); j++) { + if (m_levels[i]->get_shard_version(j) == 0) { + m_levels[i]->set_shard_version(j, version); + } + } + } + } + private: LevelVector m_levels; }; diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index 5bc891b..37b2b40 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -15,6 +15,7 @@ */ #pragma once +#include #include #include @@ -28,6 +29,7 @@ template QueryType> class InternalLevel { typedef typename ShardType::RECORD RecordType; typedef BufferView BuffView; + typedef std::pair, size_t> shard_ptr; public: InternalLevel(ssize_t level_no) : m_level_no(level_no) {} @@ -47,8 +49,8 @@ public: std::vector shards; for (auto shard : m_shards) { - if (shard) - shards.emplace_back(shard.get()); + if (shard.first) + shards.emplace_back(shard.first.get()); } return new ShardType(shards); @@ -59,10 +61,10 @@ public: std::vector &local_queries, typename QueryType::Parameters *query_parms) const { for (size_t i = 0; i < m_shards.size(); i++) { - if (m_shards[i]) { + if (m_shards[i].first) { auto local_query = - QueryType::local_preproc(m_shards[i].get(), query_parms); - shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].get()}); + QueryType::local_preproc(m_shards[i].first.get(), query_parms); + shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].first.get()}); local_queries.emplace_back(local_query); } } @@ -74,7 +76,7 @@ public: for (int i = m_shards.size() - 1; i >= (ssize_t)shard_stop; i--) { if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec, true); + auto res = m_shards[i].first->point_lookup(rec, true); if (res && res->is_tombstone()) { return true; } @@ -88,8 +90,8 @@ public: return false; for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec); + if (m_shards[i].first) { + auto res = m_shards[i].first->point_lookup(rec); if (res) { res->set_delete(); return true; @@ -105,7 +107,15 @@ public: return nullptr; } - return m_shards[idx].get(); + return m_shards[idx].first.get(); + } + + const size_t get_shard_version(size_t idx) const { + if (idx >= m_shards.size()) { + return 0; + } + + return m_shards[idx].second; } size_t get_shard_count() const { return m_shards.size(); } @@ -113,8 +123,8 @@ public: size_t get_record_count() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { - if (m_shards[i]) { - cnt += m_shards[i]->get_record_count(); + if (m_shards[i].first) { + cnt += m_shards[i].first->get_record_count(); } } @@ -124,8 +134,8 @@ public: size_t get_tombstone_count() const { size_t res = 0; for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i]) { - res += m_shards[i]->get_tombstone_count(); + if (m_shards[i].first) { + res += m_shards[i].first->get_tombstone_count(); } } return res; @@ -134,8 +144,8 @@ public: size_t get_aux_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { - if (m_shards[i]) { - cnt += m_shards[i]->get_aux_memory_usage(); + if (m_shards[i].first) { + cnt += m_shards[i].first->get_aux_memory_usage(); } } @@ -146,7 +156,7 @@ public: size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { - cnt += m_shards[i]->get_memory_usage(); + cnt += m_shards[i].first->get_memory_usage(); } } @@ -158,8 +168,8 @@ public: size_t reccnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { - tscnt += m_shards[i]->get_tombstone_count(); - reccnt += m_shards[i]->get_record_count(); + tscnt += m_shards[i].first->get_tombstone_count(); + reccnt += m_shards[i].first->get_record_count(); } } @@ -169,7 +179,7 @@ public: size_t get_nonempty_shard_count() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { - if (m_shards[i] && m_shards[i]->get_record_count() > 0) { + if (m_shards[i].first && m_shards[i].first->get_record_count() > 0) { cnt += 1; } } @@ -180,7 +190,7 @@ public: std::shared_ptr clone() const { auto new_level = std::make_shared(m_level_no); for (size_t i = 0; i < m_shards.size(); i++) { - new_level->append(m_shards[i]); + new_level->append(m_shards[i].first, m_shards[i].second); } return new_level; @@ -192,21 +202,39 @@ public: m_shards.erase(m_shards.begin() + shard); } - void append(std::shared_ptr shard) { - m_shards.emplace_back(shard); + void append(std::shared_ptr shard, size_t version=0) { + m_shards.push_back({shard, version}); + } + + void append(shard_ptr shard) { + m_shards.push_back(shard); } const ShardType *get_shard(ShardID shid) const { if (shid < m_shards.size()) { - return m_shards[shid].get(); + return m_shards[shid].first.get(); } return nullptr; } + const shard_ptr get_shard_ptr(size_t shid) const { + if (shid < m_shards.size()) { + return m_shards[shid]; + } + + return {nullptr, 0}; + } + + void set_shard_version(size_t idx, size_t version) { + if (idx < m_shards.size()) { + m_shards[idx].second = version; + } + } + private: ssize_t m_level_no; - std::vector> m_shards; + std::vector m_shards; }; } // namespace de -- cgit v1.2.3