summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore3
-rw-r--r--CMakeLists.txt5
-rw-r--r--benchmarks/include/standard_benchmarks.h3
-rw-r--r--benchmarks/tail-latency/insert_query_threads.cpp129
-rw-r--r--benchmarks/tail-latency/stall_benchmark.cpp144
-rw-r--r--include/framework/DynamicExtension.h6
-rw-r--r--include/framework/reconstruction/CompactOnFull.h82
7 files changed, 308 insertions, 64 deletions
diff --git a/.gitignore b/.gitignore
index cb46256..b4dd4fc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,3 +12,6 @@ tests/data
data/*
queries/*
+*.txt
+*.tsv
+*.dat
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f01432b..f6badb1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -338,6 +338,11 @@ if (tail_bench)
target_include_directories(knn_query_parm_sweep PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include)
target_link_options(knn_query_parm_sweep PUBLIC -mcx16)
+ add_executable(stall_benchmark ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/stall_benchmark.cpp)
+ target_link_libraries(stall_benchmark PUBLIC gsl pthread atomic)
+ target_include_directories(stall_benchmark PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include)
+ target_link_options(stall_benchmark PUBLIC -mcx16)
+
endif()
if (bench)
diff --git a/benchmarks/include/standard_benchmarks.h b/benchmarks/include/standard_benchmarks.h
index 2e990a7..8c282d7 100644
--- a/benchmarks/include/standard_benchmarks.h
+++ b/benchmarks/include/standard_benchmarks.h
@@ -25,6 +25,7 @@
#include "framework/reconstruction/TieringPolicy.h"
#include "framework/reconstruction/BSMPolicy.h"
#include "framework/reconstruction/FloodL0Policy.h"
+#include "framework/reconstruction/CompactOnFull.h"
#include "framework/reconstruction/BackgroundTieringPolicy.h"
constexpr double delete_proportion = 0.05;
@@ -49,6 +50,8 @@ std::unique_ptr<de::ReconstructionPolicy<S, Q>> get_policy(size_t scale_factor,
recon = new de::FixedShardCountPolicy<S, Q>(buffer_size, scale_factor, reccnt);
} else if (policy == 5) {
recon = new de::BackgroundTieringPolicy<S, Q>(scale_factor, buffer_size, modifier);
+ } else if (policy == 6) {
+ recon = new de::CompactOnFull<S, Q>(scale_factor, buffer_size, modifier);
}
return std::unique_ptr<de::ReconstructionPolicy<S, Q>>(recon);
diff --git a/benchmarks/tail-latency/insert_query_threads.cpp b/benchmarks/tail-latency/insert_query_threads.cpp
index d5939de..3f78f16 100644
--- a/benchmarks/tail-latency/insert_query_threads.cpp
+++ b/benchmarks/tail-latency/insert_query_threads.cpp
@@ -70,17 +70,20 @@ void query_thread(Ext *extension, std::vector<QP> *queries) {
void insert_thread(Ext *extension, std::vector<Rec> *records, size_t start_idx,
size_t stop_idx) {
+ gsl_rng *rng = gsl_rng_alloc(gsl_rng_mt19937);
+
TIMER_INIT();
TIMER_START();
for (size_t i = start_idx; i < stop_idx; i++) {
- while (!extension->insert((*records)[i])) {
+ while (!extension->insert((*records)[i], rng)) {
usleep(1);
}
}
TIMER_STOP();
total_insert_time.fetch_add(TIMER_RESULT());
+ gsl_rng_free(rng);
}
void usage(char *progname) {
@@ -99,16 +102,15 @@ int main(int argc, char **argv) {
std::string q_fname = std::string(argv[3]);
auto data = read_sosd_file<Rec>(d_fname, n);
- //auto queries = read_range_queries<QP>(q_fname, .0001);
- auto queries =read_sosd_point_lookups<QP>(q_fname, 100);
+ // auto queries = read_range_queries<QP>(q_fname, .0001);
+ auto queries = read_sosd_point_lookups<QP>(q_fname, 100);
- std::vector<size_t> sfs = {8}; //, 4, 8, 16, 32, 64, 128, 256, 512, 1024};
size_t buffer_size = 8000;
- std::vector<size_t> policies = {0, 1, 2};
-
- std::vector<size_t> thread_counts = {8};
+ std::vector<size_t> policies = {6};
+ std::vector<size_t> thread_counts = {32};
std::vector<size_t> modifiers = {0};
- std::vector<size_t> scale_factors = {2, 4, 8, 16, 32, 64, 128, 256};
+ std::vector<size_t> scale_factors = {6};
+ std::vector<double> rate_limits = {1, 0.9999, 0.999, 0.99, 0.9, 0.85, 0.8};
size_t insert_threads = 1;
size_t query_threads = 1;
@@ -119,74 +121,77 @@ int main(int argc, char **argv) {
for (auto internal_thread_cnt : thread_counts) {
for (auto mod : modifiers) {
for (auto sf : scale_factors) {
- auto policy = get_policy<Shard, Q>(sf, buffer_size, pol, n, mod);
- auto config = Conf(std::move(policy));
- config.recon_enable_maint_on_flush = true;
- config.recon_maint_disabled = false;
- // config.buffer_flush_trigger = 4000;
- config.maximum_threads = internal_thread_cnt;
-
- g_thrd_cnt = internal_thread_cnt;
-
- total_insert_time.store(0);
- total_query_time.store(0);
- total_query_count.store(0);
-
- auto extension = new Ext(std::move(config));
-
- /* warmup structure w/ 10% of records */
- size_t warmup = .3 * n;
- for (size_t k = 0; k < warmup; k++) {
- while (!extension->insert(data[k])) {
- usleep(1);
+ for (auto lim : rate_limits) {
+
+ auto policy = get_policy<Shard, Q>(sf, buffer_size, pol, n, mod);
+ auto config = Conf(std::move(policy));
+ config.recon_enable_maint_on_flush = true;
+ config.recon_maint_disabled = false;
+ // config.buffer_flush_trigger = 4000;
+ config.maximum_threads = internal_thread_cnt;
+
+ g_thrd_cnt = internal_thread_cnt;
+
+ total_insert_time.store(0);
+ total_query_time.store(0);
+ total_query_count.store(0);
+
+ auto extension = new Ext(std::move(config), lim);
+
+ /* warmup structure w/ 10% of records */
+ size_t warmup = .3 * n;
+ for (size_t k = 0; k < warmup; k++) {
+ while (!extension->insert(data[k])) {
+ usleep(1);
+ }
}
- }
- extension->await_version();
+ extension->await_version();
- idx.store(warmup);
+ idx.store(warmup);
- std::thread i_thrds[insert_threads];
- std::thread q_thrds[query_threads];
+ std::thread i_thrds[insert_threads];
+ std::thread q_thrds[query_threads];
- size_t per_insert_thrd = (n - warmup) / insert_threads;
- size_t start = warmup;
+ size_t per_insert_thrd = (n - warmup) / insert_threads;
+ size_t start = warmup;
- for (size_t i = 0; i < insert_threads; i++) {
- i_thrds[i] = std::thread(insert_thread, extension, &data, start,
- start + per_insert_thrd);
- start += per_insert_thrd;
- }
+ for (size_t i = 0; i < insert_threads; i++) {
+ i_thrds[i] = std::thread(insert_thread, extension, &data, start,
+ start + per_insert_thrd);
+ start += per_insert_thrd;
+ }
- for (size_t i = 0; i < query_threads; i++) {
- q_thrds[i] = std::thread(query_thread, extension, &queries);
- }
+ for (size_t i = 0; i < query_threads; i++) {
+ q_thrds[i] = std::thread(query_thread, extension, &queries);
+ }
- for (size_t i = 0; i < insert_threads; i++) {
- i_thrds[i].join();
- }
+ for (size_t i = 0; i < insert_threads; i++) {
+ i_thrds[i].join();
+ }
- inserts_done.store(true);
+ inserts_done.store(true);
- for (size_t i = 0; i < query_threads; i++) {
- q_thrds[i].join();
- }
+ for (size_t i = 0; i < query_threads; i++) {
+ q_thrds[i].join();
+ }
- fprintf(stderr, "%ld\n", total_res.load());
+ fprintf(stderr, "%ld\n", total_res.load());
- size_t insert_tput =
- ((double)(n - warmup) / (double)total_insert_time) * 1e9;
- size_t query_lat = (double)total_query_time.load() /
- (double)total_query_count.load();
+ size_t insert_tput =
+ ((double)(n - warmup) / (double)total_insert_time) * 1e9;
+ size_t query_lat = (double)total_query_time.load() /
+ (double)total_query_count.load();
- fprintf(stdout, "%ld\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\n", internal_thread_cnt, pol, sf,
- mod, extension->get_height(), extension->get_shard_count(),
- insert_tput, query_lat);
- fflush(stdout);
+ fprintf(stdout, "%ld\t%lf\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\n",
+ internal_thread_cnt, lim, pol, sf, mod, extension->get_height(),
+ extension->get_shard_count(), insert_tput, query_lat);
+ fflush(stdout);
- total_res.store(0);
- inserts_done.store(false);
- delete extension;
+ total_res.store(0);
+ inserts_done.store(false);
+ delete extension;
+ }
}
}
}
diff --git a/benchmarks/tail-latency/stall_benchmark.cpp b/benchmarks/tail-latency/stall_benchmark.cpp
new file mode 100644
index 0000000..b1210b2
--- /dev/null
+++ b/benchmarks/tail-latency/stall_benchmark.cpp
@@ -0,0 +1,144 @@
+/*
+ *
+ */
+
+#include <cstdlib>
+#define ENABLE_TIMER
+#define TS_TEST
+
+#include <thread>
+
+#include "file_util.h"
+#include "framework/DynamicExtension.h"
+#include "framework/interface/Record.h"
+#include "framework/scheduling/FIFOScheduler.h"
+#include "framework/scheduling/SerialScheduler.h"
+#include "framework/util/Configuration.h"
+#include "query/pointlookup.h"
+#include "shard/ISAMTree.h"
+#include "standard_benchmarks.h"
+#include "util/types.h"
+
+#include "framework/reconstruction/FixedShardCountPolicy.h"
+
+#include <gsl/gsl_rng.h>
+
+#include "psu-util/timer.h"
+
+typedef de::Record<uint64_t, uint64_t> Rec;
+typedef de::ISAMTree<Rec> Shard;
+typedef de::pl::Query<Shard> Q;
+typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE,
+ de::FIFOScheduler>
+ Ext;
+typedef Q::Parameters QP;
+typedef de::DEConfiguration<Shard, Q, de::DeletePolicy::TOMBSTONE,
+ de::FIFOScheduler>
+ Conf;
+
+std::atomic<size_t> idx;
+std::atomic<bool> inserts_done = false;
+
+ssize_t query_ratio = 0;
+
+std::atomic<size_t> total_res = 0;
+size_t reccnt = 0;
+
+size_t g_thrd_cnt = 0;
+
+std::atomic<size_t> total_insert_time = 0;
+std::atomic<size_t> total_insert_count = 0;
+std::atomic<size_t> total_query_time = 0;
+std::atomic<size_t> total_query_count = 0;
+
+void insert_thread(Ext *extension, std::vector<Rec> *records, size_t start_idx,
+ size_t stop_idx, gsl_rng *rng) {
+ TIMER_INIT();
+
+ for (size_t i = start_idx; i < stop_idx; i++) {
+ TIMER_START();
+ while (!extension->insert((*records)[i], rng)) {
+ usleep(1);
+ }
+ TIMER_STOP();
+
+ fprintf(stdout, "I\t%ld\n", TIMER_RESULT());
+ }
+}
+
+void usage(char *progname) {
+ fprintf(stderr, "%s reccnt datafile rate_limit policy\n", progname);
+}
+
+int main(int argc, char **argv) {
+
+ if (argc < 5) {
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+
+ size_t n = atol(argv[1]);
+ std::string d_fname = std::string(argv[2]);
+ double rate_limit = std::atof(argv[3]);
+ size_t pol = std::atol(argv[4]);
+ assert(pol >= 0 && pol <= 6);
+
+ auto data = read_sosd_file<Rec>(d_fname, n);
+
+ size_t buffer_size = 8000;
+ size_t scale_factor = 8;
+ double modifier = 0;
+ size_t insert_threads = 1;
+ size_t internal_thread_cnt = 32;
+ reccnt = n;
+
+ gsl_rng *rng = gsl_rng_alloc(gsl_rng_mt19937);
+
+ auto policy = get_policy<Shard, Q>(scale_factor, buffer_size, pol, n, modifier);
+ auto config = Conf(std::move(policy));
+ config.recon_enable_maint_on_flush = true;
+ config.recon_maint_disabled = false;
+ // config.buffer_flush_trigger = 4000;
+ config.maximum_threads = internal_thread_cnt;
+
+ g_thrd_cnt = internal_thread_cnt;
+
+ total_insert_time.store(0);
+ total_query_time.store(0);
+ total_query_count.store(0);
+
+ auto extension = new Ext(std::move(config), rate_limit);
+
+ /* warmup structure w/ 30% of records */
+ size_t warmup = .3 * n;
+ for (size_t k = 0; k < warmup; k++) {
+ while (!extension->insert(data[k])) {
+ usleep(1);
+ }
+ }
+
+ extension->await_version();
+
+ idx.store(warmup);
+
+ std::thread i_thrds[insert_threads];
+
+ size_t per_insert_thrd = (n - warmup) / insert_threads;
+ size_t start = warmup;
+
+ for (size_t i = 0; i < insert_threads; i++) {
+ i_thrds[i] = std::thread(insert_thread, extension, &data, start,
+ start + per_insert_thrd, rng);
+ start += per_insert_thrd;
+ }
+
+ for (size_t i = 0; i < insert_threads; i++) {
+ i_thrds[i].join();
+ }
+
+ inserts_done.store(true);
+ inserts_done.store(false);
+ delete extension;
+
+ fflush(stderr);
+}
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 59b784b..b65ad47 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -70,7 +70,7 @@ public:
* for various configuration parameters in the system. See
* include/framework/util/Configuration.h for details.
*/
- DynamicExtension(ConfType &&config) : m_config(std::move(config)) {
+ DynamicExtension(ConfType &&config, double insertion_rate=1.0) : m_config(std::move(config)) {
m_buffer = std::make_unique<BufferType>(m_config.buffer_flush_trigger,
m_config.buffer_size);
@@ -81,7 +81,7 @@ public:
m_version_counter = INITIAL_VERSION;
m_preempt_version = INVALID_VERSION;
- m_insertion_rate.store(1.0);
+ m_insertion_rate.store(insertion_rate);
assert(m_config.recon_policy);
}
@@ -493,6 +493,7 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
+ fprintf(stdout, "S\t%ld\n", extension->get_shard_count());
// fprintf(stderr, "[I] Running flush (%ld)\n", recon_id);
// fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n",
// args->version->get_id(), recon_id);
@@ -657,6 +658,7 @@ private:
// fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n",
// args->version->get_id(), recon_id);
+
/* manually delete the argument object */
delete args;
}
diff --git a/include/framework/reconstruction/CompactOnFull.h b/include/framework/reconstruction/CompactOnFull.h
new file mode 100644
index 0000000..f5e0400
--- /dev/null
+++ b/include/framework/reconstruction/CompactOnFull.h
@@ -0,0 +1,82 @@
+/*
+ * include/framework/reconstruction/LevelingPolicy.h
+ *
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+#include <cmath>
+
+#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "framework/scheduling/Version.h"
+#include "util/types.h"
+
+namespace de {
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
+class CompactOnFull : public ReconstructionPolicy<ShardType, QueryType> {
+ typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
+ LevelVector;
+
+public:
+ CompactOnFull(size_t scale_factor, size_t buffer_size, size_t size_modifier=0)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(size_modifier) {}
+
+ std::vector<ReconstructionVector> get_reconstruction_tasks(
+ const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ std::vector<ReconstructionVector> reconstructions;
+
+ auto levels = version->get_structure()->get_level_vector();
+
+ if (levels[0]->get_shard_count() == 0) {
+ return {};
+ }
+
+ for (level_index i=0; i < (ssize_t) levels.size(); i++) {
+ if (levels[i]->get_shard_count() >= m_scale_factor && lock_mngr.take_lock(i, version->get_id())) {
+ ReconstructionVector recon;
+ size_t total_reccnt = levels[i]->get_record_count();
+ std::vector<ShardID> shards;
+ for (ssize_t j=0; j<(ssize_t) levels[i]->get_shard_count(); j++) {
+ shards.push_back({i,j});
+ }
+
+ recon.add_reconstruction(shards, i+1, total_reccnt, ReconstructionType::Compact);
+ reconstructions.push_back(recon);
+ }
+ }
+
+ return reconstructions;
+ }
+
+
+ ReconstructionVector
+ get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
+ ReconstructionVector reconstructions;
+
+ return reconstructions;
+ }
+
+private:
+ level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
+ level_index target_level = invalid_level_idx;
+
+ for (level_index i = 1; i < (level_index)levels.size(); i++) {
+ if (levels[i]->get_shard_count() + 1 <= capacity(reccnt)) {
+ target_level = i;
+ break;
+ }
+ }
+
+ return target_level;
+ }
+
+ inline size_t capacity(size_t reccnt) const { return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); }
+
+ size_t m_scale_factor;
+ size_t m_buffer_size;
+ size_t m_size_modifier;
+};
+} // namespace de