diff options
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 51 |
1 files changed, 30 insertions, 21 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 68f85e2..422ca10 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -75,9 +75,9 @@ typedef ssize_t level_index; template <RecordInterface R, ShardInterface S, QueryInterface Q, typename FQ=void> class DynamicExtension { - //typedef typename S<WrappedRecord<R>> Shard; + //typedef typename S<R> Shard; typedef S Shard; - typedef MutableBuffer<WrappedRecord<R>> Buffer; + typedef MutableBuffer<R> Buffer; public: DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) @@ -142,18 +142,23 @@ public: level->get_query_states(shards, states, parms); } - std::vector<std::vector<R>> query_results(shards.size() + 1); + std::vector<std::vector<R>*> query_results(shards.size() + 1, nullptr); // Execute the query for the buffer - query_results[0] = Q::buffer_query(buffer, buffer_state, parms); + query_results[0] = filter_deletes(Q::buffer_query(buffer, buffer_state, parms), {-1, -1}, buffer); // Execute the query for each shard for (size_t i=0; i<shards.size(); i++) { - query_results[i] = filter_deletes(Q::query(shards[i].second, states[i], parms)); + query_results[i] = filter_deletes(Q::query(shards[i].second, states[i], parms), shards[i].first, buffer); } // Merge the results together - auto result = Q::merge(query_results); + auto result = Q::merge(&query_results); + + for (size_t i=0; i<query_results.size(); i++) { + delete query_results[i]; + Q::delete_query_state(states[i]); + } return result; } @@ -259,7 +264,7 @@ private: size_t m_scale_factor; double m_max_delete_prop; - std::vector<InternalLevel<WrappedRecord<R>, S, Q> *> m_levels; + std::vector<InternalLevel<R, S, Q> *> m_levels; Buffer *get_buffer() { return m_buffer; @@ -274,14 +279,12 @@ private: merge_buffer(); } - WrappedRecord<R> wrec = static_cast<WrappedRecord<R>>(rec); - - return buffer->append(wrec, ts); + return buffer->append(rec, ts); } - std::vector<R> filter_deletes(std::vector<R> records, ShardID shid, Buffer *buffer) { - std::vector<R> processed_records; - processed_records.reserve(records.size()); + std::vector<R> *filter_deletes(std::vector<R> *records, ShardID shid, Buffer *buffer) { + std::vector<R> *processed_records = new std::vector<R>(); + processed_records->reserve(records->size()); // For delete tagging, we just need to check the delete bit on each // record. @@ -291,9 +294,10 @@ private: continue; } - processed_records.emplace_back(rec.rec); + processed_records->emplace_back(static_cast<R>(rec.rec)); } + delete records; return processed_records; } @@ -320,8 +324,10 @@ private: } } - processed_records.emplace_back(rec.rec); + processed_records->emplace_back(static_cast<R>(rec.rec)); } + + delete records; } /* @@ -337,7 +343,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel<WrappedRecord<R>, Shard, Q>(new_idx, new_shard_cnt)); + m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)); return new_idx; } @@ -416,14 +422,14 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); + m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); mark_as_unused(tmp); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel<WrappedRecord<R>, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); + m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); } @@ -432,9 +438,9 @@ private: if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel<WrappedRecord<R>, Shard, Q>(0, 1); + auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); temp_level->append_buffer(buffer); - auto new_level = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(old_level, temp_level); + auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level); m_levels[0] = new_level; delete temp_level; @@ -450,7 +456,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel<WrappedRecord<R>, Shard, Q> *level) { + inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) { delete level; } @@ -460,6 +466,9 @@ private: * levels below idx are below the limit. */ inline void enforce_delete_maximum(level_index idx) { + // FIXME: currently broken due to tombstone cancellation issues + return; + long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx); if (ts_prop > (long double) m_max_delete_prop) { |