diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-29 12:33:58 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-29 12:33:58 -0400 |
| commit | b00682429988f17152e7573ffeffa1cecfdd3d3a (patch) | |
| tree | b621adaa2bfe3a7a9846970f7426fd950e17aa99 /include/framework/DynamicExtension.h | |
| parent | de5fa133758e2f0aad855ac58dff5cfa13d06f74 (diff) | |
| download | dynamic-extension-b00682429988f17152e7573ffeffa1cecfdd3d3a.tar.gz | |
Tests and bugfixes for framework
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 30 |
1 files changed, 16 insertions, 14 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index a70dda4..68f85e2 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -77,7 +77,7 @@ template <RecordInterface R, ShardInterface S, QueryInterface Q, typename FQ=voi class DynamicExtension { //typedef typename S<WrappedRecord<R>> Shard; typedef S Shard; - typedef MutableBuffer<R> Buffer; + typedef MutableBuffer<WrappedRecord<R>> Buffer; public: DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) @@ -149,7 +149,7 @@ public: // Execute the query for each shard for (size_t i=0; i<shards.size(); i++) { - query_results[i] = post_process(Q::query(shards[i].second, states[i], parms)); + query_results[i] = filter_deletes(Q::query(shards[i].second, states[i], parms)); } // Merge the results together @@ -233,7 +233,7 @@ public: } } - shards.emplace_back(new S(get_buffer(), nullptr)); + shards.emplace_back(new S(get_buffer())); Shard *shards_array[shards.size()]; @@ -244,7 +244,7 @@ public: } } - Shard *flattened = new S(shards_array, j, nullptr); + Shard *flattened = new S(shards_array, j); for (auto shard : shards) { delete shard; @@ -259,13 +259,13 @@ private: size_t m_scale_factor; double m_max_delete_prop; - std::vector<InternalLevel<R, S, Q> *> m_levels; + std::vector<InternalLevel<WrappedRecord<R>, S, Q> *> m_levels; Buffer *get_buffer() { return m_buffer; } - int internal_append(R &rec, bool ts) { + int internal_append(const R &rec, bool ts) { Buffer *buffer; while (!(buffer = get_buffer())) ; @@ -274,10 +274,12 @@ private: merge_buffer(); } - return buffer->append(rec, ts); + WrappedRecord<R> wrec = static_cast<WrappedRecord<R>>(rec); + + return buffer->append(wrec, ts); } - std::vector<R> post_process(std::vector<R> records, ShardID shid, Buffer *buffer) { + std::vector<R> filter_deletes(std::vector<R> records, ShardID shid, Buffer *buffer) { std::vector<R> processed_records; processed_records.reserve(records.size()); @@ -335,7 +337,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)); + m_levels.emplace_back(new InternalLevel<WrappedRecord<R>, Shard, Q>(new_idx, new_shard_cnt)); return new_idx; } @@ -414,14 +416,14 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); + m_levels[base_level] = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); mark_as_unused(tmp); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); + m_levels[incoming_level] = new InternalLevel<WrappedRecord<R>, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); } @@ -430,9 +432,9 @@ private: if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); + auto temp_level = new InternalLevel<WrappedRecord<R>, Shard, Q>(0, 1); temp_level->append_buffer(buffer); - auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level); + auto new_level = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(old_level, temp_level); m_levels[0] = new_level; delete temp_level; @@ -448,7 +450,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) { + inline void mark_as_unused(InternalLevel<WrappedRecord<R>, Shard, Q> *level) { delete level; } |