diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-09-25 14:42:44 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-09-25 14:42:44 -0400 |
| commit | cf5f3bbb0cb58430ed68ad3ebfcefc009e553d71 (patch) | |
| tree | 4c17bc3169ee195c236cea9c9efda0aef7488e3c /include/framework/reconstruction | |
| parent | 826c1fff5accbaa6b415acc176a5acbeb5f691b6 (diff) | |
| download | dynamic-extension-cf5f3bbb0cb58430ed68ad3ebfcefc009e553d71.tar.gz | |
Code reformatting
Diffstat (limited to 'include/framework/reconstruction')
| -rw-r--r-- | include/framework/reconstruction/BSMPolicy.h | 12 | ||||
| -rw-r--r-- | include/framework/reconstruction/BackgroundTieringPolicy.h | 38 | ||||
| -rw-r--r-- | include/framework/reconstruction/CompactOnFull.h | 37 | ||||
| -rw-r--r-- | include/framework/reconstruction/FixedShardCountPolicy.h | 30 | ||||
| -rw-r--r-- | include/framework/reconstruction/FloodL0Policy.h | 3 | ||||
| -rw-r--r-- | include/framework/reconstruction/LevelingPolicy.h | 10 | ||||
| -rw-r--r-- | include/framework/reconstruction/ReconstructionPolicy.h | 19 | ||||
| -rw-r--r-- | include/framework/reconstruction/TieringPolicy.h | 26 |
8 files changed, 106 insertions, 69 deletions
diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h index 61f379e..db4c8d4 100644 --- a/include/framework/reconstruction/BSMPolicy.h +++ b/include/framework/reconstruction/BSMPolicy.h @@ -36,7 +36,8 @@ public: ReconstructionVector reconstructions; auto levels = version->get_structure()->get_level_vector(); - level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count()); + level_index target_level = find_reconstruction_target( + levels, version->get_structure()->get_record_count()); assert(target_level != -1); level_index source_level = 0; @@ -49,7 +50,8 @@ public: task.target = target_level; size_t reccnt = 0; - if (target_level < (ssize_t)levels.size() && levels[target_level]->get_record_count() > 0) { + if (target_level < (ssize_t)levels.size() && + levels[target_level]->get_record_count() > 0) { task.sources.push_back({target_level, all_shards_idx}); task.type = ReconstructionType::Merge; } else { @@ -71,7 +73,8 @@ public: } private: - level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const { + level_index find_reconstruction_target(LevelVector &levels, + size_t reccnt) const { level_index target_level = invalid_level_idx; for (level_index i = 1; i < (level_index)levels.size(); i++) { @@ -85,7 +88,8 @@ private: } inline size_t capacity(level_index level, size_t reccnt) const { - double base = std::ceil(m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier)); + double base = std::ceil( + m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier)); return m_buffer_size * (base - 1) * pow(base, level + 1); } diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h index 36556a2..1c000b9 100644 --- a/include/framework/reconstruction/BackgroundTieringPolicy.h +++ b/include/framework/reconstruction/BackgroundTieringPolicy.h @@ -16,16 +16,20 @@ namespace de { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> -class BackgroundTieringPolicy : public ReconstructionPolicy<ShardType, QueryType> { +class BackgroundTieringPolicy + : public ReconstructionPolicy<ShardType, QueryType> { typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector; public: - BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size, size_t size_modifier=0) - : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(size_modifier) {} - - std::vector<ReconstructionVector> get_reconstruction_tasks( - const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size, + size_t size_modifier = 0) + : m_scale_factor(scale_factor), m_buffer_size(buffer_size), + m_size_modifier(size_modifier) {} + + std::vector<ReconstructionVector> + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, + LockManager &lock_mngr) const override { std::vector<ReconstructionVector> reconstructions; auto levels = version->get_structure()->get_level_vector(); @@ -34,7 +38,8 @@ public: return {}; } - level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count()); + level_index target_level = find_reconstruction_target( + levels, version->get_structure()->get_record_count()); assert(target_level != -1); level_index source_level = 0; @@ -44,19 +49,21 @@ public: } for (level_index i = target_level; i > source_level; i--) { - if (lock_mngr.take_lock(i-1, version->get_id())) { + if (lock_mngr.take_lock(i - 1, version->get_id())) { ReconstructionVector recon; size_t total_reccnt = levels[i - 1]->get_record_count(); std::vector<ShardID> shards; - for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) { - shards.push_back({i-1, j}); + for (ssize_t j = 0; j < (ssize_t)levels[i - 1]->get_shard_count(); + j++) { + shards.push_back({i - 1, j}); } - recon.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact); + recon.add_reconstruction(shards, i, total_reccnt, + ReconstructionType::Compact); reconstructions.push_back(recon); } } - + return reconstructions; } @@ -68,7 +75,8 @@ public: } private: - level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const { + level_index find_reconstruction_target(LevelVector &levels, + size_t reccnt) const { level_index target_level = invalid_level_idx; for (level_index i = 1; i < (level_index)levels.size(); i++) { @@ -81,7 +89,9 @@ private: return target_level; } - inline size_t capacity(size_t reccnt) const { return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); } + inline size_t capacity(size_t reccnt) const { + return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); + } size_t m_scale_factor; size_t m_buffer_size; diff --git a/include/framework/reconstruction/CompactOnFull.h b/include/framework/reconstruction/CompactOnFull.h index f5e0400..f0b549d 100644 --- a/include/framework/reconstruction/CompactOnFull.h +++ b/include/framework/reconstruction/CompactOnFull.h @@ -21,11 +21,14 @@ class CompactOnFull : public ReconstructionPolicy<ShardType, QueryType> { LevelVector; public: - CompactOnFull(size_t scale_factor, size_t buffer_size, size_t size_modifier=0) - : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(size_modifier) {} - - std::vector<ReconstructionVector> get_reconstruction_tasks( - const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + CompactOnFull(size_t scale_factor, size_t buffer_size, + size_t size_modifier = 0) + : m_scale_factor(scale_factor), m_buffer_size(buffer_size), + m_size_modifier(size_modifier) {} + + std::vector<ReconstructionVector> + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, + LockManager &lock_mngr) const override { std::vector<ReconstructionVector> reconstructions; auto levels = version->get_structure()->get_level_vector(); @@ -34,23 +37,24 @@ public: return {}; } - for (level_index i=0; i < (ssize_t) levels.size(); i++) { - if (levels[i]->get_shard_count() >= m_scale_factor && lock_mngr.take_lock(i, version->get_id())) { + for (level_index i = 0; i < (ssize_t)levels.size(); i++) { + if (levels[i]->get_shard_count() >= m_scale_factor && + lock_mngr.take_lock(i, version->get_id())) { ReconstructionVector recon; size_t total_reccnt = levels[i]->get_record_count(); std::vector<ShardID> shards; - for (ssize_t j=0; j<(ssize_t) levels[i]->get_shard_count(); j++) { - shards.push_back({i,j}); + for (ssize_t j = 0; j < (ssize_t)levels[i]->get_shard_count(); j++) { + shards.push_back({i, j}); } - recon.add_reconstruction(shards, i+1, total_reccnt, ReconstructionType::Compact); + recon.add_reconstruction(shards, i + 1, total_reccnt, + ReconstructionType::Compact); reconstructions.push_back(recon); } } - return reconstructions; - } - + return reconstructions; + } ReconstructionVector get_flush_tasks(const Version<ShardType, QueryType> *version) const override { @@ -60,7 +64,8 @@ public: } private: - level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const { + level_index find_reconstruction_target(LevelVector &levels, + size_t reccnt) const { level_index target_level = invalid_level_idx; for (level_index i = 1; i < (level_index)levels.size(); i++) { @@ -73,7 +78,9 @@ private: return target_level; } - inline size_t capacity(size_t reccnt) const { return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); } + inline size_t capacity(size_t reccnt) const { + return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); + } size_t m_scale_factor; size_t m_buffer_size; diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h index cc8dce4..52d9f39 100644 --- a/include/framework/reconstruction/FixedShardCountPolicy.h +++ b/include/framework/reconstruction/FixedShardCountPolicy.h @@ -16,16 +16,20 @@ namespace de { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> -class FixedShardCountPolicy : public ReconstructionPolicy<ShardType, QueryType> { +class FixedShardCountPolicy + : public ReconstructionPolicy<ShardType, QueryType> { typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector; public: - FixedShardCountPolicy(size_t buffer_size, size_t shard_count, size_t max_record_count) - : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {} + FixedShardCountPolicy(size_t buffer_size, size_t shard_count, + size_t max_record_count) + : m_buffer_size(buffer_size), m_shard_count(shard_count), + m_max_reccnt(max_record_count) {} std::vector<ReconstructionVector> - get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, + LockManager &lock_mngr) const override { return {}; } @@ -37,26 +41,26 @@ public: /* if this is the very first flush, there won't be an L1 yet */ if (levels.size() > 1 && levels[1]->get_shard_count() > 0) { - ShardID last_shid = {1, (shard_index) (levels[1]->get_shard_count() - 1)}; - if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) { - auto task = ReconstructionTask { - {{0, 0}, last_shid}, 1, m_buffer_size,ReconstructionType::Merge - }; + ShardID last_shid = {1, (shard_index)(levels[1]->get_shard_count() - 1)}; + if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() + + m_buffer_size <= + capacity()) { + auto task = ReconstructionTask{ + {{0, 0}, last_shid}, 1, m_buffer_size, ReconstructionType::Merge}; v.add_reconstruction(task); return v; } } - auto task = ReconstructionTask { - {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append - }; + auto task = ReconstructionTask{ + {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append}; v.add_reconstruction(task); return v; } private: inline size_t capacity() const { - double bps = (double) m_max_reccnt / (double) m_buffer_size / m_shard_count; + double bps = (double)m_max_reccnt / (double)m_buffer_size / m_shard_count; return ceil(bps) * m_buffer_size; } diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h index c0d29fe..7e38e02 100644 --- a/include/framework/reconstruction/FloodL0Policy.h +++ b/include/framework/reconstruction/FloodL0Policy.h @@ -24,7 +24,8 @@ public: FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {} std::vector<ReconstructionVector> - get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, + LockManager &lock_mngr) const override { return {}; } diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h index 955bc02..3e0afaa 100644 --- a/include/framework/reconstruction/LevelingPolicy.h +++ b/include/framework/reconstruction/LevelingPolicy.h @@ -58,8 +58,9 @@ public: (target_level == 1) ? m_buffer_size + target_reccnt : levels[target_level - 1]->get_record_count() + target_reccnt; - auto type = (target_level >= (level_index) levels.size()) ? ReconstructionType::Append - : ReconstructionType::Merge; + auto type = (target_level >= (level_index)levels.size()) + ? ReconstructionType::Append + : ReconstructionType::Merge; reconstructions.add_reconstruction(target_level - 1, target_level, total_reccnt, type); @@ -95,8 +96,9 @@ private: inline size_t capacity(level_index level, size_t reccnt) const { return m_buffer_size * - pow(m_scale_factor * std::ceil(std::pow<double>(std::log10(reccnt), m_size_modifier)), level); - + pow(m_scale_factor * std::ceil(std::pow<double>(std::log10(reccnt), + m_size_modifier)), + level); } size_t m_scale_factor; diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h index 41a2092..3c842b2 100644 --- a/include/framework/reconstruction/ReconstructionPolicy.h +++ b/include/framework/reconstruction/ReconstructionPolicy.h @@ -12,19 +12,22 @@ */ #pragma once -#include "util/types.h" -#include "framework/structure/ExtensionStructure.h" -#include "framework/scheduling/Version.h" #include "framework/scheduling/LockManager.h" +#include "framework/scheduling/Version.h" +#include "framework/structure/ExtensionStructure.h" +#include "util/types.h" namespace de { -template<ShardInterface ShardType, QueryInterface<ShardType> QueryType> +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> class ReconstructionPolicy { typedef ExtensionStructure<ShardType, QueryType> StructureType; public: ReconstructionPolicy() {} - virtual std::vector<ReconstructionVector> get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const = 0; - virtual ReconstructionVector get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0; - }; -} + virtual std::vector<ReconstructionVector> + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, + LockManager &lock_mngr) const = 0; + virtual ReconstructionVector + get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0; +}; +} // namespace de diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h index b1fcb49..c16c427 100644 --- a/include/framework/reconstruction/TieringPolicy.h +++ b/include/framework/reconstruction/TieringPolicy.h @@ -21,11 +21,13 @@ class TieringPolicy : public ReconstructionPolicy<ShardType, QueryType> { LevelVector; public: - TieringPolicy(size_t scale_factor, size_t buffer_size, double modifier=0) - : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(modifier) {} + TieringPolicy(size_t scale_factor, size_t buffer_size, double modifier = 0) + : m_scale_factor(scale_factor), m_buffer_size(buffer_size), + m_size_modifier(modifier) {} - std::vector<ReconstructionVector> get_reconstruction_tasks( - const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + std::vector<ReconstructionVector> + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, + LockManager &lock_mngr) const override { return {}; } @@ -34,7 +36,8 @@ public: ReconstructionVector reconstructions; auto levels = version->get_structure()->get_level_vector(); - level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count()); + level_index target_level = find_reconstruction_target( + levels, version->get_structure()->get_record_count()); assert(target_level != -1); level_index source_level = 0; @@ -47,12 +50,13 @@ public: size_t total_reccnt = levels[i - 1]->get_record_count(); std::vector<ShardID> shards; - for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) { - shards.push_back({i-1, j}); + for (ssize_t j = 0; j < (ssize_t)levels[i - 1]->get_shard_count(); j++) { + shards.push_back({i - 1, j}); } if (total_reccnt > 0 || shards.size() > 0) { - reconstructions.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact); + reconstructions.add_reconstruction(shards, i, total_reccnt, + ReconstructionType::Compact); } } @@ -60,7 +64,8 @@ public: } private: - level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const { + level_index find_reconstruction_target(LevelVector &levels, + size_t reccnt) const { level_index target_level = invalid_level_idx; for (level_index i = 1; i < (level_index)levels.size(); i++) { @@ -74,7 +79,8 @@ private: } inline size_t capacity(size_t reccnt) const { - return std::ceil((double) m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier)); + return std::ceil((double)m_scale_factor * + std::pow<double>(std::log10(reccnt), m_size_modifier)); } size_t m_scale_factor; |