summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-04-08 14:18:32 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2025-04-08 14:18:32 -0400
commit47c8864758399a83b5a80e2a8a31ea316b06505d (patch)
treedc856c65e56ccbf336ed0ed9412bbb6743974331
parenta9ee3f2f7f557adbe5448e4f4fbf71c23fe808b1 (diff)
downloaddynamic-extension-47c8864758399a83b5a80e2a8a31ea316b06505d.tar.gz
BSM bugfixes
-rw-r--r--benchmarks/include/standard_benchmarks.h2
-rw-r--r--benchmarks/tail-latency/insert_query_threads.cpp7
-rw-r--r--include/framework/reconstruction/BSMPolicy.h19
3 files changed, 12 insertions, 16 deletions
diff --git a/benchmarks/include/standard_benchmarks.h b/benchmarks/include/standard_benchmarks.h
index 8388fd1..96541fe 100644
--- a/benchmarks/include/standard_benchmarks.h
+++ b/benchmarks/include/standard_benchmarks.h
@@ -41,7 +41,7 @@ std::unique_ptr<de::ReconstructionPolicy<S, Q>> get_policy(size_t scale_factor,
} else if (policy == 1) {
recon = new de::LevelingPolicy<S, Q>(scale_factor, buffer_size, modifier);
} else if (policy == 2) {
- recon = new de::BSMPolicy<S, Q>(buffer_size, modifier);
+ recon = new de::BSMPolicy<S, Q>(buffer_size, scale_factor, modifier);
} else if (policy == 3) {
recon = new de::FloodL0Policy<S, Q>(buffer_size);
} else if (policy == 4) {
diff --git a/benchmarks/tail-latency/insert_query_threads.cpp b/benchmarks/tail-latency/insert_query_threads.cpp
index fe001ea..0d92fde 100644
--- a/benchmarks/tail-latency/insert_query_threads.cpp
+++ b/benchmarks/tail-latency/insert_query_threads.cpp
@@ -103,10 +103,10 @@ int main(int argc, char **argv) {
std::vector<size_t> sfs = {8}; //, 4, 8, 16, 32, 64, 128, 256, 512, 1024};
size_t buffer_size = 8000;
- std::vector<size_t> policies = {1, 2, 0};
+ std::vector<size_t> policies = {2};
std::vector<size_t> thread_counts = {8};
- std::vector<size_t> modifiers = {0, 1, 2, 3};
+ std::vector<size_t> modifiers = {0};
std::vector<size_t> scale_factors = {2, 3, 4, 5, 6, 7, 8};
size_t insert_threads = 1;
@@ -178,7 +178,8 @@ int main(int argc, char **argv) {
size_t query_lat = (double)total_query_time.load() /
(double)total_query_count.load();
- fprintf(stdout, "%ld\t%ld\t%ld\t%ld\t%ld\t%ld\n", internal_thread_cnt, pol, sf, mod,
+ fprintf(stdout, "%ld\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\n", internal_thread_cnt, pol, sf,
+ mod, extension->get_height(), extension->get_shard_count(),
insert_tput, query_lat);
fflush(stdout);
diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h
index 42ae506..65554fb 100644
--- a/include/framework/reconstruction/BSMPolicy.h
+++ b/include/framework/reconstruction/BSMPolicy.h
@@ -36,7 +36,7 @@ public:
ReconstructionVector reconstructions;
auto levels = version->get_structure()->get_level_vector();
- level_index target_level = find_reconstruction_target(levels);
+ level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count());
assert(target_level != -1);
level_index source_level = 0;
@@ -48,22 +48,17 @@ public:
ReconstructionTask task;
task.target = target_level;
- if (target_level == 1 &&
- (levels.size() == 1 || levels[1]->get_record_count() == 0)) {
- /* if the first level is empty, then we just append the buffer to it */
- task.type = ReconstructionType::Append;
- } else {
- /* otherwise, we'll need to do a merge of at least two shards */
- task.type = ReconstructionType::Merge;
- }
-
size_t reccnt = 0;
if (target_level < (ssize_t)levels.size() && levels[target_level]->get_record_count() > 0) {
task.sources.push_back({target_level, all_shards_idx});
+ task.type = ReconstructionType::Merge;
+ } else {
+ task.type = ReconstructionType::Append;
}
for (level_index i = target_level - 1; i >= source_level; i--) {
assert(i < (ssize_t)levels.size());
+ assert(levels[i]->get_record_count() > 0 || i == 0);
task.sources.push_back({i, all_shards_idx});
reccnt += levels[i]->get_record_count();
}
@@ -76,11 +71,11 @@ public:
}
private:
- level_index find_reconstruction_target(LevelVector &levels) const {
+ level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
level_index target_level = invalid_level_idx;
for (level_index i = 1; i < (level_index)levels.size(); i++) {
- if (levels[i]->get_record_count() == 0) {
+ if (levels[i]->get_record_count() < capacity(i, reccnt)) {
target_level = i;
break;
}