From ba65c8976f54d4da2467074235a12f5be0bd5ebc Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Sun, 22 Dec 2024 15:12:13 -0500 Subject: Continued development --- include/framework/structure/ExtensionStructure.h | 63 ++++++++++++++++++------ include/framework/structure/InternalLevel.h | 28 +++++------ include/framework/structure/MutableBuffer.h | 22 ++++----- 3 files changed, 74 insertions(+), 39 deletions(-) (limited to 'include/framework/structure') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 9b7ae87..3bb8a0b 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -147,14 +147,14 @@ public: inline void perform_reconstruction(ReconstructionTask task) { /* perform the reconstruction itself */ - std::vector shards; + std::vector shards; for (ShardID shid : task.sources) { - assert(shid.level_idx < m_levels.size()); + assert(shid.level_idx < (level_index) m_levels.size()); assert(shid.shard_idx >= -1); /* if unspecified, push all shards into the vector */ if (shid.shard_idx == all_shards_idx) { - for (size_t i = 0; i < m_levels[shid.level_idx].get_shard_count(); + for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) { if (m_levels[shid.level_idx]->get_shard(i)) { shards.push_back(m_levels[shid.level_idx]->get_shard(i)); @@ -165,7 +165,7 @@ public: } } - auto new_shard = Shard(shards); + auto new_shard = new ShardType(shards); /* * Remove all of the shards processed by the operation @@ -181,10 +181,11 @@ public: /* * Append the new shard to the target level */ - if (task.target < m_levels.size()) { - m_levels[task.target]->append_shard(new_shard); - } else { - m_levels.push_back(); + if (task.target < (level_index)m_levels.size()) { + m_levels[task.target]->append(std::shared_ptr(new_shard)); + } else { /* grow the structure if needed */ + m_levels.push_back(std::make_shared>(task.target)); + m_levels[task.target]->append(std::shared_ptr(new_shard)); } } @@ -197,12 +198,20 @@ public: * the buffer itself. Given that we're unlikely to actually use policies * like that, we'll leave this as low priority. */ - ShardType *buffer_shard = new ShardType(buffer); - if (task.type == ReconstructionType::Append) { - m_levels[0]->append(std::shared_ptr(buffer_shard)); + + /* insert the first level, if needed */ + if (m_levels.size() == 0) { + m_levels.push_back( + std::make_shared>(0)); + } + + ShardType *buffer_shard = new ShardType(std::move(buffer)); + if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) { + m_levels[0]->append(std::shared_ptr(buffer_shard)); } else { - std::vector shards; - for (size_t i = 0; i < m_levels[0].size(); i++) { + std::vector shards; + for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count(); + i++) { if (m_levels[0]->get_shard(i)) { shards.push_back(m_levels[0]->get_shard(i)); } @@ -210,7 +219,7 @@ public: shards.push_back(buffer_shard); ShardType *new_shard = new ShardType(shards); m_levels[0]->truncate(); - m_levels[0]->append(std::shared_ptr(new_shard)); + m_levels[0]->append(std::shared_ptr(new_shard)); } } } @@ -243,6 +252,32 @@ public: LevelVector const &get_level_vector() const { return m_levels; } + + /* + * Validate that no level in the structure exceeds its maximum tombstone + * capacity. This is used to trigger preemptive compactions at the end of + * the reconstruction process. + */ + bool validate_tombstone_proportion(double max_delete_prop) const { + long double ts_prop; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + ts_prop = (long double)m_levels[i]->get_tombstone_count() / + (long double)m_levels[i]->get_record_count(); + if (ts_prop > (long double)max_delete_prop) { + return false; + } + } + } + + return true; + } + + bool validate_tombstone_proportion(level_index level, double max_delete_prop) const { + long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) m_levels[level]->get_record_count(); + return ts_prop <= (long double) max_delete_prop; + } + private: std::atomic m_refcnt; LevelVector m_levels; diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index 8cfcd49..c9d1749 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -40,12 +40,12 @@ public: * * No changes are made to this level. */ - ShardType *get_combined_shard() { + ShardType *get_combined_shard() const { if (m_shards.size() == 0) { return nullptr; } - std::vector shards; + std::vector shards; for (auto shard : m_shards) { if (shard) shards.emplace_back(shard.get()); @@ -57,7 +57,7 @@ public: void get_local_queries( std::vector> &shards, std::vector &local_queries, - typename QueryType::Parameters *query_parms) { + typename QueryType::Parameters *query_parms) const { for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { auto local_query = @@ -68,7 +68,7 @@ public: } } - bool check_tombstone(size_t shard_stop, const RecordType &rec) { + bool check_tombstone(size_t shard_stop, const RecordType &rec) const { if (m_shards.size() == 0) return false; @@ -100,7 +100,7 @@ public: return false; } - ShardType *get_shard(size_t idx) { + const ShardType *get_shard(size_t idx) const { if (idx >= m_shards.size()) { return nullptr; } @@ -108,9 +108,9 @@ public: return m_shards[idx].get(); } - size_t get_shard_count() { return m_shards.size(); } + size_t get_shard_count() const { return m_shards.size(); } - size_t get_record_count() { + size_t get_record_count() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { @@ -121,7 +121,7 @@ public: return cnt; } - size_t get_tombstone_count() { + size_t get_tombstone_count() const { size_t res = 0; for (size_t i = 0; i < m_shards.size(); ++i) { if (m_shards[i]) { @@ -131,7 +131,7 @@ public: return res; } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { @@ -142,7 +142,7 @@ public: return cnt; } - size_t get_memory_usage() { + size_t get_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { @@ -153,7 +153,7 @@ public: return cnt; } - double get_tombstone_prop() { + double get_tombstone_prop() const { size_t tscnt = 0; size_t reccnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { @@ -166,10 +166,10 @@ public: return (double)tscnt / (double)(tscnt + reccnt); } - std::shared_ptr clone() { + std::shared_ptr clone() const { auto new_level = std::make_shared(m_level_no); for (size_t i = 0; i < m_shards.size(); i++) { - new_level->m_shards[i] = m_shards[i]; + new_level->append(m_shards[i]); } return new_level; @@ -181,7 +181,7 @@ public: m_shards.erase(m_shards.begin() + shard); } - bool append(std::shared_ptr shard) { + void append(std::shared_ptr shard) { m_shards.emplace_back(shard); } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 625b04b..0eae73d 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -97,15 +97,15 @@ public: return true; } - size_t get_record_count() { return m_tail.load() - m_head.load().head_idx; } + size_t get_record_count() const { return m_tail.load() - m_head.load().head_idx; } - size_t get_capacity() { return m_cap; } + size_t get_capacity() const { return m_cap; } - bool is_full() { return get_record_count() >= m_hwm; } + bool is_full() const { return get_record_count() >= m_hwm; } - bool is_at_low_watermark() { return get_record_count() >= m_lwm; } + bool is_at_low_watermark() const { return get_record_count() >= m_lwm; } - size_t get_tombstone_count() { return m_tscnt.load(); } + size_t get_tombstone_count() const { return m_tscnt.load(); } bool delete_record(const R &rec) { return get_buffer_view().delete_record(rec); @@ -115,9 +115,9 @@ public: return get_buffer_view().check_tombstone(rec); } - size_t get_memory_usage() { return m_cap * sizeof(Wrapped); } + size_t get_memory_usage() const { return m_cap * sizeof(Wrapped); } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { return m_tombstone_filter->get_memory_usage(); } @@ -200,7 +200,7 @@ public: m_lwm = lwm; } - size_t get_low_watermark() { return m_lwm; } + size_t get_low_watermark() const { return m_lwm; } void set_high_watermark(size_t hwm) { assert(hwm > m_lwm); @@ -208,9 +208,9 @@ public: m_hwm = hwm; } - size_t get_high_watermark() { return m_hwm; } + size_t get_high_watermark() const { return m_hwm; } - size_t get_tail() { return m_tail.load(); } + size_t get_tail() const { return m_tail.load(); } /* * Note: this returns the available physical storage capacity, @@ -220,7 +220,7 @@ public: * but a buggy framework implementation may violate the * assumption. */ - size_t get_available_capacity() { + size_t get_available_capacity() const { if (m_old_head.load().refcnt == 0) { return m_cap - (m_tail.load() - m_head.load().head_idx); } -- cgit v1.2.3