summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-05-29 15:38:44 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-05-29 15:38:44 -0400
commit85942ad2cb99b8a0984579d7dba9504f351ffac0 (patch)
treedbcd43dd20e8128cf7a476daf543dc200d181001 /include/framework
parente920fa57cf9c503e560055864e4de37912b239e1 (diff)
downloaddynamic-extension-85942ad2cb99b8a0984579d7dba9504f351ffac0.tar.gz
Fixed InternalLevel memory leak
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h13
-rw-r--r--include/framework/InternalLevel.h89
2 files changed, 61 insertions, 41 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 422ca10..91d41f8 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -282,8 +282,8 @@ private:
return buffer->append(rec, ts);
}
- std::vector<R> *filter_deletes(std::vector<R> *records, ShardID shid, Buffer *buffer) {
- std::vector<R> *processed_records = new std::vector<R>();
+ std::vector<R> filter_deletes(std::vector<Wrapped<R>> *records, ShardID shid, Buffer *buffer) {
+ std::vector<R> processed_records;
processed_records->reserve(records->size());
// For delete tagging, we just need to check the delete bit on each
@@ -294,7 +294,7 @@ private:
continue;
}
- processed_records->emplace_back(static_cast<R>(rec.rec));
+ processed_records.emplace_back(rec.rec);
}
delete records;
@@ -324,7 +324,7 @@ private:
}
}
- processed_records->emplace_back(static_cast<R>(rec.rec));
+ processed_records.emplace_back(rec.rec);
}
delete records;
@@ -401,7 +401,7 @@ private:
level_index merge_level_idx;
size_t incoming_rec_cnt = get_level_record_count(idx, buffer);
- for (level_index i=idx+1; i<=m_levels.size(); i++) {
+ for (level_index i=idx+1; i<m_levels.size(); i++) {
if (can_merge_with(i, incoming_rec_cnt)) {
return i;
}
@@ -466,9 +466,6 @@ private:
* levels below idx are below the limit.
*/
inline void enforce_delete_maximum(level_index idx) {
- // FIXME: currently broken due to tombstone cancellation issues
- return;
-
long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx);
if (ts_prop > (long double) m_max_delete_prop) {
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index d28ba5f..62c6915 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -28,18 +28,32 @@ class InternalLevel {
typedef MutableBuffer<R> Buffer;
public:
InternalLevel(ssize_t level_no, size_t shard_cap)
- : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<Shard*>(shard_cap, nullptr), free_shards)
+ : m_level_no(level_no)
+ , m_shard_cnt(0)
+ , m_shards(shard_cap, nullptr)
+ , m_owns(shard_cap, true)
{}
// Create a new memory level sharing the shards and repurposing it as previous level_no + 1
// WARNING: for leveling only.
InternalLevel(InternalLevel* level)
- : m_level_no(level->m_level_no + 1), m_shard_cnt(level->m_shard_cnt)
- , m_shards(level->m_shards, free_shards) {
- assert(m_shard_cnt == 1 && m_shards->size() == 1);
+ : m_level_no(level->m_level_no + 1)
+ , m_shard_cnt(level->m_shard_cnt)
+ , m_shards(level->m_shards.size(), nullptr)
+ , m_owns(level->m_owns.size(), true) {
+ assert(m_shard_cnt == 1 && m_shards.size() == 1);
+
+ for (size_t i=0; i<m_shards.size(); i++) {
+ level->m_owns[i] = false;
+ m_shards[i] = level->m_shards[i];
+ }
}
- ~InternalLevel() { }
+ ~InternalLevel() {
+ for (size_t i=0; i<m_shards.size(); i++) {
+ if (m_owns[i]) delete m_shards[i];
+ }
+ }
// WARNING: for leveling only.
// assuming the base level is the level new level is merging into. (base_level is larger.)
@@ -48,22 +62,25 @@ public:
auto res = new InternalLevel(base_level->m_level_no, 1);
res->m_shard_cnt = 1;
Shard* shards[2];
- shards[0] = (*base_level->m_shards)[0];
- shards[1] = (*new_level->m_shards)[0];
+ shards[0] = base_level->m_shards[0];
+ shards[1] = new_level->m_shards[0];
- (*res->m_shards)[0] = new S(shards, 2);
+ res->m_shards[0] = new S(shards, 2);
return res;
}
void append_buffer(Buffer* buffer) {
- assert(m_shard_cnt < m_shards->size());
- (*m_shards)[m_shard_cnt] = new S(buffer);
+ assert(m_shard_cnt < m_shards.size());
+ m_shards[m_shard_cnt] = new S(buffer);
+ m_owns[m_shard_cnt] = true;
++m_shard_cnt;
}
void append_merged_shards(InternalLevel* level) {
- assert(m_shard_cnt < m_shards->size());
- (*m_shards)[m_shard_cnt] = new S(level->m_shards->data(), level->m_shard_cnt);
+ assert(m_shard_cnt < m_shards.size());
+ m_shards[m_shard_cnt] = new S(level->m_shards.data(), level->m_shard_cnt);
+ m_owns[m_shard_cnt] = true;
+
++m_shard_cnt;
}
@@ -71,7 +88,7 @@ public:
Shard *shards[m_shard_cnt];
for (size_t i=0; i<m_shard_cnt; i++) {
- shards[i] = (*m_shards)[i];
+ shards[i] = m_shards[i];
}
return new S(shards, m_shard_cnt);
@@ -80,9 +97,9 @@ public:
// Append the sample range in-order.....
void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
for (size_t i=0; i<m_shard_cnt; i++) {
- if ((*m_shards)[i]) {
- auto shard_state = Q::get_query_state((*m_shards)[i], query_parms);
- shards.push_back({{m_level_no, (ssize_t) i}, (*m_shards)[i]});
+ if (m_shards[i]) {
+ auto shard_state = Q::get_query_state(m_shards[i], query_parms);
+ shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i]});
shard_states.emplace_back(shard_state);
}
}
@@ -92,8 +109,8 @@ public:
if (m_shard_cnt == 0) return false;
for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) {
- if ((*m_shards)[i]) {
- auto res = (*m_shards)[i]->point_lookup(rec, true);
+ if (m_shards[i]) {
+ auto res = m_shards[i]->point_lookup(rec, true);
if (res && res->is_tombstone()) {
return true;
}
@@ -105,9 +122,9 @@ public:
bool delete_record(const R &rec) {
if (m_shard_cnt == 0) return false;
- for (size_t i = 0; i < (*m_shards)->size(); ++i) {
- if ((*m_shards)[i]) {
- auto res = (*m_shards)[i]->point_lookup(rec);
+ for (size_t i = 0; i < m_shards->size(); ++i) {
+ if (m_shards[i]) {
+ auto res = m_shards[i]->point_lookup(rec);
if (res) {
res->set_delete();
}
@@ -118,7 +135,7 @@ public:
}
Shard* get_shard(size_t idx) {
- return (*m_shards)[idx];
+ return m_shards[idx];
}
size_t get_shard_count() {
@@ -128,7 +145,7 @@ public:
size_t get_record_cnt() {
size_t cnt = 0;
for (size_t i=0; i<m_shard_cnt; i++) {
- cnt += (*m_shards)[i]->get_record_count();
+ cnt += m_shards[i]->get_record_count();
}
return cnt;
@@ -137,7 +154,7 @@ public:
size_t get_tombstone_count() {
size_t res = 0;
for (size_t i = 0; i < m_shard_cnt; ++i) {
- res += (*m_shards)[i]->get_tombstone_count();
+ res += m_shards[i]->get_tombstone_count();
}
return res;
}
@@ -145,7 +162,7 @@ public:
size_t get_aux_memory_usage() {
size_t cnt = 0;
for (size_t i=0; i<m_shard_cnt; i++) {
- cnt += (*m_shards)[i]->get_aux_memory_usage();
+ cnt += m_shards[i]->get_aux_memory_usage();
}
return cnt;
@@ -154,8 +171,8 @@ public:
size_t get_memory_usage() {
size_t cnt = 0;
for (size_t i=0; i<m_shard_cnt; i++) {
- if ((*m_shards)[i]) {
- cnt += (*m_shards)[i]->get_memory_usage();
+ if (m_shards[i]) {
+ cnt += m_shards[i]->get_memory_usage();
}
}
@@ -166,8 +183,8 @@ public:
size_t tscnt = 0;
size_t reccnt = 0;
for (size_t i=0; i<m_shard_cnt; i++) {
- if ((*m_shards)[i]) {
- tscnt += (*m_shards)[i]->get_tombstone_count();
+ if (m_shards[i]) {
+ tscnt += m_shards[i]->get_tombstone_count();
reccnt += (*m_shards[i])->get_record_count();
}
}
@@ -181,11 +198,17 @@ private:
size_t m_shard_cnt;
size_t m_shard_size_cap;
- static void free_shards(std::vector<Shard*>* vec) {
- for (size_t i=0; i<vec->size(); i++) delete (*vec)[i];
- }
+ std::vector<Shard*> m_shards;
+ std::vector<bool> m_owns;
- std::shared_ptr<std::vector<Shard*>> m_shards;
+ InternalLevel *clone() {
+ auto new_level = new InternalLevel(m_level_no, m_shards.size());
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ new_level->m_shards[i] = m_shards[i];
+ new_level->m_owns[i] = true;
+ m_owns[i] = false;
+ }
+ }
};
}