diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-17 16:00:20 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-17 16:00:20 -0400 |
| commit | 6fd50506d2e50d2faf2478a2883a2ef1b4840a78 (patch) | |
| tree | eb29955b435a6e2e6196b9c89de15c6b8885c4f3 /include/framework/DynamicExtension.h | |
| parent | 75a8418b580234521b5fa23340bee959c357acf9 (diff) | |
| download | dynamic-extension-6fd50506d2e50d2faf2478a2883a2ef1b4840a78.tar.gz | |
Started implementing shard interface (not finished yet)
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 41 |
1 files changed, 23 insertions, 18 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index fd42c5f..53b55b1 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -16,6 +16,9 @@ #include "framework/MutableBuffer.h" #include "framework/InternalLevel.h" +#include "framework/ShardInterface.h" + +#include "shard/WIRS.h" #include "ds/Alias.h" #include "util/timer.h" @@ -68,9 +71,8 @@ enum class DeletePolicy { typedef ssize_t level_index; -template <RecordInterface R> +template <RecordInterface R, ShardInterface S> class DynamicExtension { - typedef WIRS<R> Shard; typedef decltype(R::key) K; typedef decltype(R::value) V; typedef decltype(R::weight) W; @@ -143,7 +145,7 @@ public: // Get the shard weights for each level. Index 0 is the buffer, // represented by nullptr. - std::vector<std::pair<ShardID, Shard*>> shards; + std::vector<std::pair<ShardID, S*>> shards; std::vector<void*> states; shards.push_back({{-1, -1}, nullptr}); states.push_back(nullptr); @@ -151,13 +153,15 @@ public: std::vector<W> shard_weights; shard_weights.push_back((double) buffer_weight); + WIRS<R>::wirs_query_parms parms = {lower_key, upper_key}; + for (auto &level : m_levels) { - level->get_shard_weights(shard_weights, shards, states, lower_key, upper_key); + level->get_query_states(shard_weights, shards, states, &parms); } if (shard_weights.size() == 1 && shard_weights[0] == 0) { if (buffer_alias) delete buffer_alias; - for (auto& x: states) Shard::delete_state(x); + for (auto& x: states) S::delete_query_state(x); sampling_bailouts++; return; // no records in the sampling range } @@ -243,7 +247,7 @@ public: } while (sample_idx < sample_sz); if (buffer_alias) delete buffer_alias; - for (auto& x: states) Shard::delete_state(x); + for (auto& x: states) S::delete_query_state(x); enforce_rejection_rate_maximum(rng); } @@ -348,8 +352,8 @@ public: } - Shard *create_ssi() { - std::vector<Shard *> shards; + S *create_ssi() { + std::vector<S *> shards; if (m_levels.size() > 0) { for (int i=m_levels.size() - 1; i>= 0; i--) { @@ -359,9 +363,9 @@ public: } } - shards.emplace_back(new Shard(get_buffer(), nullptr)); + shards.emplace_back(new S(get_buffer(), nullptr)); - Shard *shards_array[shards.size()]; + S *shards_array[shards.size()]; size_t j = 0; for (size_t i=0; i<shards.size(); i++) { @@ -370,7 +374,7 @@ public: } } - Shard *flattened = new Shard(shards_array, j, nullptr); + S *flattened = new S(shards_array, j, nullptr); for (auto shard : shards) { delete shard; @@ -390,7 +394,7 @@ private: double m_max_delete_prop; double m_max_rejection_rate; - std::vector<InternalLevel<R> *> m_levels; + std::vector<InternalLevel<R, S> *> m_levels; level_index m_last_level_idx; @@ -446,7 +450,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel<R>(new_idx, new_shard_cnt)); + m_levels.emplace_back(new InternalLevel<R, S>(new_idx, new_shard_cnt)); m_last_level_idx++; return new_idx; @@ -526,14 +530,14 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<R>::merge_levels(m_levels[base_level], m_levels[incoming_level], rng); + m_levels[base_level] = InternalLevel<R, S>::merge_levels(m_levels[base_level], m_levels[incoming_level], rng); mark_as_unused(tmp); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level], rng); } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel<R>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); + m_levels[incoming_level] = new InternalLevel<R, S>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); } @@ -542,9 +546,9 @@ private: if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel<R>(0, 1); + auto temp_level = new InternalLevel<R, S>(0, 1); temp_level->append_mem_table(buffer, rng); - auto new_level = InternalLevel<R>::merge_levels(old_level, temp_level, rng); + auto new_level = InternalLevel<R, S>::merge_levels(old_level, temp_level, rng); m_levels[0] = new_level; delete temp_level; @@ -560,7 +564,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel<R> *level) { + inline void mark_as_unused(InternalLevel<R, S> *level) { delete level; } @@ -609,6 +613,7 @@ private: * no guarantees about which buffer will be accessed if level_index is -1. */ inline size_t get_level_record_count(level_index idx, MutableBuffer<R> *buffer=nullptr) { + assert(idx >= -1); if (idx == -1) { return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count(); |