From 85942ad2cb99b8a0984579d7dba9504f351ffac0 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 29 May 2023 15:38:44 -0400 Subject: Fixed InternalLevel memory leak --- include/framework/DynamicExtension.h | 13 ++---- include/framework/InternalLevel.h | 89 +++++++++++++++++++++++------------- 2 files changed, 61 insertions(+), 41 deletions(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 422ca10..91d41f8 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -282,8 +282,8 @@ private: return buffer->append(rec, ts); } - std::vector *filter_deletes(std::vector *records, ShardID shid, Buffer *buffer) { - std::vector *processed_records = new std::vector(); + std::vector filter_deletes(std::vector> *records, ShardID shid, Buffer *buffer) { + std::vector processed_records; processed_records->reserve(records->size()); // For delete tagging, we just need to check the delete bit on each @@ -294,7 +294,7 @@ private: continue; } - processed_records->emplace_back(static_cast(rec.rec)); + processed_records.emplace_back(rec.rec); } delete records; @@ -324,7 +324,7 @@ private: } } - processed_records->emplace_back(static_cast(rec.rec)); + processed_records.emplace_back(rec.rec); } delete records; @@ -401,7 +401,7 @@ private: level_index merge_level_idx; size_t incoming_rec_cnt = get_level_record_count(idx, buffer); - for (level_index i=idx+1; i<=m_levels.size(); i++) { + for (level_index i=idx+1; iget_tombstone_count() / (long double) calc_level_record_capacity(idx); if (ts_prop > (long double) m_max_delete_prop) { diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index d28ba5f..62c6915 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -28,18 +28,32 @@ class InternalLevel { typedef MutableBuffer Buffer; public: InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector(shard_cap, nullptr), free_shards) + : m_level_no(level_no) + , m_shard_cnt(0) + , m_shards(shard_cap, nullptr) + , m_owns(shard_cap, true) {} // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 // WARNING: for leveling only. InternalLevel(InternalLevel* level) - : m_level_no(level->m_level_no + 1), m_shard_cnt(level->m_shard_cnt) - , m_shards(level->m_shards, free_shards) { - assert(m_shard_cnt == 1 && m_shards->size() == 1); + : m_level_no(level->m_level_no + 1) + , m_shard_cnt(level->m_shard_cnt) + , m_shards(level->m_shards.size(), nullptr) + , m_owns(level->m_owns.size(), true) { + assert(m_shard_cnt == 1 && m_shards.size() == 1); + + for (size_t i=0; im_owns[i] = false; + m_shards[i] = level->m_shards[i]; + } } - ~InternalLevel() { } + ~InternalLevel() { + for (size_t i=0; im_level_no, 1); res->m_shard_cnt = 1; Shard* shards[2]; - shards[0] = (*base_level->m_shards)[0]; - shards[1] = (*new_level->m_shards)[0]; + shards[0] = base_level->m_shards[0]; + shards[1] = new_level->m_shards[0]; - (*res->m_shards)[0] = new S(shards, 2); + res->m_shards[0] = new S(shards, 2); return res; } void append_buffer(Buffer* buffer) { - assert(m_shard_cnt < m_shards->size()); - (*m_shards)[m_shard_cnt] = new S(buffer); + assert(m_shard_cnt < m_shards.size()); + m_shards[m_shard_cnt] = new S(buffer); + m_owns[m_shard_cnt] = true; ++m_shard_cnt; } void append_merged_shards(InternalLevel* level) { - assert(m_shard_cnt < m_shards->size()); - (*m_shards)[m_shard_cnt] = new S(level->m_shards->data(), level->m_shard_cnt); + assert(m_shard_cnt < m_shards.size()); + m_shards[m_shard_cnt] = new S(level->m_shards.data(), level->m_shard_cnt); + m_owns[m_shard_cnt] = true; + ++m_shard_cnt; } @@ -71,7 +88,7 @@ public: Shard *shards[m_shard_cnt]; for (size_t i=0; i> &shards, std::vector& shard_states, void *query_parms) { for (size_t i=0; i= (ssize_t) shard_stop; i--) { - if ((*m_shards)[i]) { - auto res = (*m_shards)[i]->point_lookup(rec, true); + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec, true); if (res && res->is_tombstone()) { return true; } @@ -105,9 +122,9 @@ public: bool delete_record(const R &rec) { if (m_shard_cnt == 0) return false; - for (size_t i = 0; i < (*m_shards)->size(); ++i) { - if ((*m_shards)[i]) { - auto res = (*m_shards)[i]->point_lookup(rec); + for (size_t i = 0; i < m_shards->size(); ++i) { + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec); if (res) { res->set_delete(); } @@ -118,7 +135,7 @@ public: } Shard* get_shard(size_t idx) { - return (*m_shards)[idx]; + return m_shards[idx]; } size_t get_shard_count() { @@ -128,7 +145,7 @@ public: size_t get_record_cnt() { size_t cnt = 0; for (size_t i=0; iget_record_count(); + cnt += m_shards[i]->get_record_count(); } return cnt; @@ -137,7 +154,7 @@ public: size_t get_tombstone_count() { size_t res = 0; for (size_t i = 0; i < m_shard_cnt; ++i) { - res += (*m_shards)[i]->get_tombstone_count(); + res += m_shards[i]->get_tombstone_count(); } return res; } @@ -145,7 +162,7 @@ public: size_t get_aux_memory_usage() { size_t cnt = 0; for (size_t i=0; iget_aux_memory_usage(); + cnt += m_shards[i]->get_aux_memory_usage(); } return cnt; @@ -154,8 +171,8 @@ public: size_t get_memory_usage() { size_t cnt = 0; for (size_t i=0; iget_memory_usage(); + if (m_shards[i]) { + cnt += m_shards[i]->get_memory_usage(); } } @@ -166,8 +183,8 @@ public: size_t tscnt = 0; size_t reccnt = 0; for (size_t i=0; iget_tombstone_count(); + if (m_shards[i]) { + tscnt += m_shards[i]->get_tombstone_count(); reccnt += (*m_shards[i])->get_record_count(); } } @@ -181,11 +198,17 @@ private: size_t m_shard_cnt; size_t m_shard_size_cap; - static void free_shards(std::vector* vec) { - for (size_t i=0; isize(); i++) delete (*vec)[i]; - } + std::vector m_shards; + std::vector m_owns; - std::shared_ptr> m_shards; + InternalLevel *clone() { + auto new_level = new InternalLevel(m_level_no, m_shards.size()); + for (size_t i=0; im_shards[i] = m_shards[i]; + new_level->m_owns[i] = true; + m_owns[i] = false; + } + } }; } -- cgit v1.2.3