summaryrefslogtreecommitdiffstats
path: root/include/framework/reconstruction
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-09-25 14:42:44 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2025-09-25 14:42:44 -0400
commitcf5f3bbb0cb58430ed68ad3ebfcefc009e553d71 (patch)
tree4c17bc3169ee195c236cea9c9efda0aef7488e3c /include/framework/reconstruction
parent826c1fff5accbaa6b415acc176a5acbeb5f691b6 (diff)
downloaddynamic-extension-cf5f3bbb0cb58430ed68ad3ebfcefc009e553d71.tar.gz
Code reformatting
Diffstat (limited to 'include/framework/reconstruction')
-rw-r--r--include/framework/reconstruction/BSMPolicy.h12
-rw-r--r--include/framework/reconstruction/BackgroundTieringPolicy.h38
-rw-r--r--include/framework/reconstruction/CompactOnFull.h37
-rw-r--r--include/framework/reconstruction/FixedShardCountPolicy.h30
-rw-r--r--include/framework/reconstruction/FloodL0Policy.h3
-rw-r--r--include/framework/reconstruction/LevelingPolicy.h10
-rw-r--r--include/framework/reconstruction/ReconstructionPolicy.h19
-rw-r--r--include/framework/reconstruction/TieringPolicy.h26
8 files changed, 106 insertions, 69 deletions
diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h
index 61f379e..db4c8d4 100644
--- a/include/framework/reconstruction/BSMPolicy.h
+++ b/include/framework/reconstruction/BSMPolicy.h
@@ -36,7 +36,8 @@ public:
ReconstructionVector reconstructions;
auto levels = version->get_structure()->get_level_vector();
- level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count());
+ level_index target_level = find_reconstruction_target(
+ levels, version->get_structure()->get_record_count());
assert(target_level != -1);
level_index source_level = 0;
@@ -49,7 +50,8 @@ public:
task.target = target_level;
size_t reccnt = 0;
- if (target_level < (ssize_t)levels.size() && levels[target_level]->get_record_count() > 0) {
+ if (target_level < (ssize_t)levels.size() &&
+ levels[target_level]->get_record_count() > 0) {
task.sources.push_back({target_level, all_shards_idx});
task.type = ReconstructionType::Merge;
} else {
@@ -71,7 +73,8 @@ public:
}
private:
- level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
+ level_index find_reconstruction_target(LevelVector &levels,
+ size_t reccnt) const {
level_index target_level = invalid_level_idx;
for (level_index i = 1; i < (level_index)levels.size(); i++) {
@@ -85,7 +88,8 @@ private:
}
inline size_t capacity(level_index level, size_t reccnt) const {
- double base = std::ceil(m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier));
+ double base = std::ceil(
+ m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier));
return m_buffer_size * (base - 1) * pow(base, level + 1);
}
diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h
index 36556a2..1c000b9 100644
--- a/include/framework/reconstruction/BackgroundTieringPolicy.h
+++ b/include/framework/reconstruction/BackgroundTieringPolicy.h
@@ -16,16 +16,20 @@
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
-class BackgroundTieringPolicy : public ReconstructionPolicy<ShardType, QueryType> {
+class BackgroundTieringPolicy
+ : public ReconstructionPolicy<ShardType, QueryType> {
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
LevelVector;
public:
- BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size, size_t size_modifier=0)
- : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(size_modifier) {}
-
- std::vector<ReconstructionVector> get_reconstruction_tasks(
- const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size,
+ size_t size_modifier = 0)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size),
+ m_size_modifier(size_modifier) {}
+
+ std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
std::vector<ReconstructionVector> reconstructions;
auto levels = version->get_structure()->get_level_vector();
@@ -34,7 +38,8 @@ public:
return {};
}
- level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count());
+ level_index target_level = find_reconstruction_target(
+ levels, version->get_structure()->get_record_count());
assert(target_level != -1);
level_index source_level = 0;
@@ -44,19 +49,21 @@ public:
}
for (level_index i = target_level; i > source_level; i--) {
- if (lock_mngr.take_lock(i-1, version->get_id())) {
+ if (lock_mngr.take_lock(i - 1, version->get_id())) {
ReconstructionVector recon;
size_t total_reccnt = levels[i - 1]->get_record_count();
std::vector<ShardID> shards;
- for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) {
- shards.push_back({i-1, j});
+ for (ssize_t j = 0; j < (ssize_t)levels[i - 1]->get_shard_count();
+ j++) {
+ shards.push_back({i - 1, j});
}
- recon.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact);
+ recon.add_reconstruction(shards, i, total_reccnt,
+ ReconstructionType::Compact);
reconstructions.push_back(recon);
}
}
-
+
return reconstructions;
}
@@ -68,7 +75,8 @@ public:
}
private:
- level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
+ level_index find_reconstruction_target(LevelVector &levels,
+ size_t reccnt) const {
level_index target_level = invalid_level_idx;
for (level_index i = 1; i < (level_index)levels.size(); i++) {
@@ -81,7 +89,9 @@ private:
return target_level;
}
- inline size_t capacity(size_t reccnt) const { return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); }
+ inline size_t capacity(size_t reccnt) const {
+ return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier);
+ }
size_t m_scale_factor;
size_t m_buffer_size;
diff --git a/include/framework/reconstruction/CompactOnFull.h b/include/framework/reconstruction/CompactOnFull.h
index f5e0400..f0b549d 100644
--- a/include/framework/reconstruction/CompactOnFull.h
+++ b/include/framework/reconstruction/CompactOnFull.h
@@ -21,11 +21,14 @@ class CompactOnFull : public ReconstructionPolicy<ShardType, QueryType> {
LevelVector;
public:
- CompactOnFull(size_t scale_factor, size_t buffer_size, size_t size_modifier=0)
- : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(size_modifier) {}
-
- std::vector<ReconstructionVector> get_reconstruction_tasks(
- const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ CompactOnFull(size_t scale_factor, size_t buffer_size,
+ size_t size_modifier = 0)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size),
+ m_size_modifier(size_modifier) {}
+
+ std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
std::vector<ReconstructionVector> reconstructions;
auto levels = version->get_structure()->get_level_vector();
@@ -34,23 +37,24 @@ public:
return {};
}
- for (level_index i=0; i < (ssize_t) levels.size(); i++) {
- if (levels[i]->get_shard_count() >= m_scale_factor && lock_mngr.take_lock(i, version->get_id())) {
+ for (level_index i = 0; i < (ssize_t)levels.size(); i++) {
+ if (levels[i]->get_shard_count() >= m_scale_factor &&
+ lock_mngr.take_lock(i, version->get_id())) {
ReconstructionVector recon;
size_t total_reccnt = levels[i]->get_record_count();
std::vector<ShardID> shards;
- for (ssize_t j=0; j<(ssize_t) levels[i]->get_shard_count(); j++) {
- shards.push_back({i,j});
+ for (ssize_t j = 0; j < (ssize_t)levels[i]->get_shard_count(); j++) {
+ shards.push_back({i, j});
}
- recon.add_reconstruction(shards, i+1, total_reccnt, ReconstructionType::Compact);
+ recon.add_reconstruction(shards, i + 1, total_reccnt,
+ ReconstructionType::Compact);
reconstructions.push_back(recon);
}
}
- return reconstructions;
- }
-
+ return reconstructions;
+ }
ReconstructionVector
get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
@@ -60,7 +64,8 @@ public:
}
private:
- level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
+ level_index find_reconstruction_target(LevelVector &levels,
+ size_t reccnt) const {
level_index target_level = invalid_level_idx;
for (level_index i = 1; i < (level_index)levels.size(); i++) {
@@ -73,7 +78,9 @@ private:
return target_level;
}
- inline size_t capacity(size_t reccnt) const { return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); }
+ inline size_t capacity(size_t reccnt) const {
+ return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier);
+ }
size_t m_scale_factor;
size_t m_buffer_size;
diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h
index cc8dce4..52d9f39 100644
--- a/include/framework/reconstruction/FixedShardCountPolicy.h
+++ b/include/framework/reconstruction/FixedShardCountPolicy.h
@@ -16,16 +16,20 @@
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
-class FixedShardCountPolicy : public ReconstructionPolicy<ShardType, QueryType> {
+class FixedShardCountPolicy
+ : public ReconstructionPolicy<ShardType, QueryType> {
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
LevelVector;
public:
- FixedShardCountPolicy(size_t buffer_size, size_t shard_count, size_t max_record_count)
- : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {}
+ FixedShardCountPolicy(size_t buffer_size, size_t shard_count,
+ size_t max_record_count)
+ : m_buffer_size(buffer_size), m_shard_count(shard_count),
+ m_max_reccnt(max_record_count) {}
std::vector<ReconstructionVector>
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
return {};
}
@@ -37,26 +41,26 @@ public:
/* if this is the very first flush, there won't be an L1 yet */
if (levels.size() > 1 && levels[1]->get_shard_count() > 0) {
- ShardID last_shid = {1, (shard_index) (levels[1]->get_shard_count() - 1)};
- if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) {
- auto task = ReconstructionTask {
- {{0, 0}, last_shid}, 1, m_buffer_size,ReconstructionType::Merge
- };
+ ShardID last_shid = {1, (shard_index)(levels[1]->get_shard_count() - 1)};
+ if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() +
+ m_buffer_size <=
+ capacity()) {
+ auto task = ReconstructionTask{
+ {{0, 0}, last_shid}, 1, m_buffer_size, ReconstructionType::Merge};
v.add_reconstruction(task);
return v;
}
}
- auto task = ReconstructionTask {
- {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append
- };
+ auto task = ReconstructionTask{
+ {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append};
v.add_reconstruction(task);
return v;
}
private:
inline size_t capacity() const {
- double bps = (double) m_max_reccnt / (double) m_buffer_size / m_shard_count;
+ double bps = (double)m_max_reccnt / (double)m_buffer_size / m_shard_count;
return ceil(bps) * m_buffer_size;
}
diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h
index c0d29fe..7e38e02 100644
--- a/include/framework/reconstruction/FloodL0Policy.h
+++ b/include/framework/reconstruction/FloodL0Policy.h
@@ -24,7 +24,8 @@ public:
FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {}
std::vector<ReconstructionVector>
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
return {};
}
diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h
index 955bc02..3e0afaa 100644
--- a/include/framework/reconstruction/LevelingPolicy.h
+++ b/include/framework/reconstruction/LevelingPolicy.h
@@ -58,8 +58,9 @@ public:
(target_level == 1)
? m_buffer_size + target_reccnt
: levels[target_level - 1]->get_record_count() + target_reccnt;
- auto type = (target_level >= (level_index) levels.size()) ? ReconstructionType::Append
- : ReconstructionType::Merge;
+ auto type = (target_level >= (level_index)levels.size())
+ ? ReconstructionType::Append
+ : ReconstructionType::Merge;
reconstructions.add_reconstruction(target_level - 1, target_level,
total_reccnt, type);
@@ -95,8 +96,9 @@ private:
inline size_t capacity(level_index level, size_t reccnt) const {
return m_buffer_size *
- pow(m_scale_factor * std::ceil(std::pow<double>(std::log10(reccnt), m_size_modifier)), level);
-
+ pow(m_scale_factor * std::ceil(std::pow<double>(std::log10(reccnt),
+ m_size_modifier)),
+ level);
}
size_t m_scale_factor;
diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h
index 41a2092..3c842b2 100644
--- a/include/framework/reconstruction/ReconstructionPolicy.h
+++ b/include/framework/reconstruction/ReconstructionPolicy.h
@@ -12,19 +12,22 @@
*/
#pragma once
-#include "util/types.h"
-#include "framework/structure/ExtensionStructure.h"
-#include "framework/scheduling/Version.h"
#include "framework/scheduling/LockManager.h"
+#include "framework/scheduling/Version.h"
+#include "framework/structure/ExtensionStructure.h"
+#include "util/types.h"
namespace de {
-template<ShardInterface ShardType, QueryInterface<ShardType> QueryType>
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
class ReconstructionPolicy {
typedef ExtensionStructure<ShardType, QueryType> StructureType;
public:
ReconstructionPolicy() {}
- virtual std::vector<ReconstructionVector> get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const = 0;
- virtual ReconstructionVector get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0;
- };
-}
+ virtual std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const = 0;
+ virtual ReconstructionVector
+ get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0;
+};
+} // namespace de
diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h
index b1fcb49..c16c427 100644
--- a/include/framework/reconstruction/TieringPolicy.h
+++ b/include/framework/reconstruction/TieringPolicy.h
@@ -21,11 +21,13 @@ class TieringPolicy : public ReconstructionPolicy<ShardType, QueryType> {
LevelVector;
public:
- TieringPolicy(size_t scale_factor, size_t buffer_size, double modifier=0)
- : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(modifier) {}
+ TieringPolicy(size_t scale_factor, size_t buffer_size, double modifier = 0)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size),
+ m_size_modifier(modifier) {}
- std::vector<ReconstructionVector> get_reconstruction_tasks(
- const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
return {};
}
@@ -34,7 +36,8 @@ public:
ReconstructionVector reconstructions;
auto levels = version->get_structure()->get_level_vector();
- level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count());
+ level_index target_level = find_reconstruction_target(
+ levels, version->get_structure()->get_record_count());
assert(target_level != -1);
level_index source_level = 0;
@@ -47,12 +50,13 @@ public:
size_t total_reccnt = levels[i - 1]->get_record_count();
std::vector<ShardID> shards;
- for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) {
- shards.push_back({i-1, j});
+ for (ssize_t j = 0; j < (ssize_t)levels[i - 1]->get_shard_count(); j++) {
+ shards.push_back({i - 1, j});
}
if (total_reccnt > 0 || shards.size() > 0) {
- reconstructions.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact);
+ reconstructions.add_reconstruction(shards, i, total_reccnt,
+ ReconstructionType::Compact);
}
}
@@ -60,7 +64,8 @@ public:
}
private:
- level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
+ level_index find_reconstruction_target(LevelVector &levels,
+ size_t reccnt) const {
level_index target_level = invalid_level_idx;
for (level_index i = 1; i < (level_index)levels.size(); i++) {
@@ -74,7 +79,8 @@ private:
}
inline size_t capacity(size_t reccnt) const {
- return std::ceil((double) m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier));
+ return std::ceil((double)m_scale_factor *
+ std::pow<double>(std::log10(reccnt), m_size_modifier));
}
size_t m_scale_factor;