From 7c3fe4ea760f4773f0eb1a98ed3ba493a36015e2 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 17 Sep 2025 17:52:22 -0400 Subject: Last updates for dissertation --- benchmarks/tail-latency/btree_tput.cpp | 78 ++++++++ benchmarks/tail-latency/fixed_shard_count_dist.cpp | 111 +++++++++++ benchmarks/tail-latency/insert_query_threads.cpp | 8 +- benchmarks/tail-latency/isam_latency_dist.cpp | 116 ++++++++++++ .../tail-latency/knn_insert_query_threads.cpp | 206 +++++++++++++++++++++ benchmarks/tail-latency/knn_query_parm_sweep.cpp | 11 +- benchmarks/tail-latency/knn_selectivity_sweep.cpp | 13 +- benchmarks/tail-latency/query_parm_sweep.cpp | 15 +- benchmarks/tail-latency/selectivity_sweep.cpp | 22 ++- benchmarks/tail-latency/stall_benchmark.cpp | 11 +- benchmarks/tail-latency/stall_benchmark_vptree.cpp | 2 + 11 files changed, 562 insertions(+), 31 deletions(-) create mode 100644 benchmarks/tail-latency/btree_tput.cpp create mode 100644 benchmarks/tail-latency/fixed_shard_count_dist.cpp create mode 100644 benchmarks/tail-latency/isam_latency_dist.cpp create mode 100644 benchmarks/tail-latency/knn_insert_query_threads.cpp (limited to 'benchmarks') diff --git a/benchmarks/tail-latency/btree_tput.cpp b/benchmarks/tail-latency/btree_tput.cpp new file mode 100644 index 0000000..d091ff5 --- /dev/null +++ b/benchmarks/tail-latency/btree_tput.cpp @@ -0,0 +1,78 @@ +/* + * + */ + +#define ENABLE_TIMER + +#include "shard/ISAMTree.h" +#include "query/rangequery.h" +#include "framework/interface/Record.h" +#include "file_util.h" +#include "benchmark_types.h" + +#include + +#include "psu-util/timer.h" +#include "standard_benchmarks.h" +#include "psu-ds/BTree.h" + +typedef btree_record Rec; + +typedef de::ISAMTree Shard; +typedef de::irs::Query Q; +typedef Q::Parameters QP; + +void usage(char *progname) { + fprintf(stderr, "%s reccnt datafile queryfile\n", progname); +} + +int main(int argc, char **argv) { + + if (argc < 4) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + std::string d_fname = std::string(argv[2]); + std::string q_fname = std::string(argv[3]); + + auto btree = BenchBTree(); + + auto data = read_sosd_file(d_fname, n); + + /* read in the range queries and add sample size and rng for sampling */ + auto queries = read_range_queries(q_fname, .0001); + + /* warmup structure w/ 10% of records */ + size_t warmup = .3 * n; + for (size_t i=0; i + +#include "framework/DynamicExtension.h" +#include "framework/scheduling/FIFOScheduler.h" +#include "shard/ISAMTree.h" +#include "query/rangecount.h" +#include "framework/util/Configuration.h" +#include "framework/interface/Record.h" +#include "file_util.h" +#include "standard_benchmarks.h" + +#include "framework/reconstruction/FixedShardCountPolicy.h" + +#include + +#include "psu-util/timer.h" + + +typedef de::Record Rec; +typedef de::ISAMTree Shard; +typedef de::rc::Query Q; +typedef de::DynamicExtension Ext; +typedef Q::Parameters QP; +typedef de::DEConfiguration Conf; + +void usage(char *progname) { + fprintf(stderr, "%s reccnt datafile queryfile\n", progname); +} + +int main(int argc, char **argv) { + + if (argc < 4) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + std::string d_fname = std::string(argv[2]); + std::string q_fname = std::string(argv[3]); + + + auto data = read_sosd_file(d_fname, n); + auto queries = read_range_queries(q_fname, .0001); + + std::vector shard_counts = {4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 4096*2}; + size_t buffer_size = 8000; + + for (size_t i=0; i(shard_counts[i], buffer_size, 4, n); + auto config = Conf(std::move(policy)); + config.recon_enable_maint_on_flush = false; + config.recon_maint_disabled = true; + config.buffer_flush_trigger = 4000; + config.maximum_threads = 8; + + auto extension = new Ext(std::move(config)); + + /* warmup structure w/ 10% of records */ + size_t warmup = .3 * n; + for (size_t i=0; iinsert(data[i])) { + usleep(1); + } + } + + extension->await_version(); + + TIMER_INIT(); + + for (size_t j=warmup; jinsert(data[j])) { + usleep(1); + } + TIMER_STOP(); + + fprintf(stdout, "%ld\t%ld\n", shard_counts[i], TIMER_RESULT()); + } + + + extension->await_version(); + + // /* repeat the queries a bunch of times */ + // TIMER_START(); + // for (size_t l=0; l<10; l++) { + // for (size_t i=0; iquery(std::move(q)); + // res.get(); + // } + // } + // TIMER_STOP(); + + // auto query_lat = TIMER_RESULT() / 10*queries.size(); + + // fprintf(stdout, "%ld\t%ld\t%ld\t%ld\n", shard_counts[i], extension->get_shard_count(), insert_tput, query_lat); + + delete extension; + } + + fflush(stderr); +} + diff --git a/benchmarks/tail-latency/insert_query_threads.cpp b/benchmarks/tail-latency/insert_query_threads.cpp index cb4b81b..a5a88f5 100644 --- a/benchmarks/tail-latency/insert_query_threads.cpp +++ b/benchmarks/tail-latency/insert_query_threads.cpp @@ -106,11 +106,11 @@ int main(int argc, char **argv) { auto queries = read_sosd_point_lookups(q_fname, 100); size_t buffer_size = 8000; - std::vector policies = {0}; - std::vector thread_counts = {32}; + std::vector policies = {6}; + std::vector thread_counts = {1, 2, 4, 8, 32}; std::vector modifiers = {0}; - std::vector scale_factors = {6}; - std::vector rate_limits = {1}; + std::vector scale_factors = {6}; + std::vector rate_limits = {.95, .99, .993, .995, .998, .9985, .999, 1}; size_t insert_threads = 1; size_t query_threads = 1; diff --git a/benchmarks/tail-latency/isam_latency_dist.cpp b/benchmarks/tail-latency/isam_latency_dist.cpp new file mode 100644 index 0000000..c116413 --- /dev/null +++ b/benchmarks/tail-latency/isam_latency_dist.cpp @@ -0,0 +1,116 @@ +/* + * + */ + +#include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/SerialScheduler.h" +#define ENABLE_TIMER +#define TS_TEST + +#include + +#include "framework/DynamicExtension.h" +#include "shard/ISAMTree.h" +#include "query/rangecount.h" +#include "framework/interface/Record.h" +#include "file_util.h" +#include "standard_benchmarks.h" +#include "framework/util/Configuration.h" + +#include + +#include "psu-util/timer.h" + + +typedef de::Record Rec; +typedef de::ISAMTreeShard; +typedef de::rc::Query Q; +typedef de::DynamicExtension Ext; +typedef Q::Parameters QP; +typedef de::DEConfiguration + Conf; + +void usage(char *progname) { + fprintf(stderr, "%s reccnt datafile queryfile\n", progname); +} + +int main(int argc, char **argv) { + + if (argc < 4) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + std::string d_fname = std::string(argv[2]); + std::string q_fname = std::string(argv[3]); + + + auto data = read_sosd_file(d_fname, n); + auto queries = read_range_queries(q_fname, .0001); + + std::vector policies = {0, 1}; + std::vector buffers = {1000, 8000, 16000}; + std::vector sfs = {4}; + + for (size_t l=0; l(sfs[k], buffers[j], policies[l]); + auto config = Conf(std::move(policy)); + + config.recon_enable_maint_on_flush = true; + config.recon_maint_disabled = false; + + auto extension = new Ext(std::move(config)); + + /* warmup structure w/ 10% of records */ + size_t warmup = .3 * n; + for (size_t i=0; iinsert(data[i])) { + usleep(1); + } + } + + extension->await_version(); + + TIMER_INIT(); + + for (size_t i=warmup; iinsert(data[i])) { + usleep(1); + } + TIMER_STOP(); + + fprintf(stdout, "I\t%ld\t%ld\t%d\t%ld\n", sfs[k], buffers[j], policies[l], TIMER_RESULT()); + } + + extension->await_version(); + + /* repeat the queries a bunch of times */ + for (size_t l=0; l<10; l++) { + for (size_t i=0; iquery(std::move(q)); + res.get(); + TIMER_STOP(); + + fprintf(stdout, "Q\t%ld\t%ld\t%d\t%ld\n", sfs[k], buffers[j], policies[l], TIMER_RESULT()); + } + } + + + QP p = {0, 10000}; + auto res =extension->query(std::move(p)); + + fprintf(stderr, "%ld\n", res.get()); + delete extension; + }}} + + + fflush(stderr); +} + diff --git a/benchmarks/tail-latency/knn_insert_query_threads.cpp b/benchmarks/tail-latency/knn_insert_query_threads.cpp new file mode 100644 index 0000000..fb7d5f1 --- /dev/null +++ b/benchmarks/tail-latency/knn_insert_query_threads.cpp @@ -0,0 +1,206 @@ +/* + * + */ + +#include +#define ENABLE_TIMER +#define TS_TEST + +#include + +#include "file_util.h" +#include "framework/DynamicExtension.h" +#include "framework/interface/Record.h" +#include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/SerialScheduler.h" +#include "framework/util/Configuration.h" +#include "query/knn.h" +#include "shard/VPTree.h" +#include "standard_benchmarks.h" +#include "util/types.h" + +#include "framework/reconstruction/FixedShardCountPolicy.h" + +#include + +#include "psu-util/timer.h" + + +typedef Word2VecRec Rec; +typedef de::VPTree Shard; +typedef de::knn::Query Q; +typedef de::DynamicExtension + Ext; +typedef Q::Parameters QP; +typedef de::DEConfiguration + Conf; + +std::atomic idx; +std::atomic inserts_done = false; + +ssize_t query_ratio = 0; + +std::atomic total_res = 0; +size_t reccnt = 0; + +size_t g_thrd_cnt = 0; + +std::atomic total_insert_time = 0; +std::atomic total_insert_count = 0; +std::atomic total_query_time = 0; +std::atomic total_query_count = 0; + +void query_thread(Ext *extension, std::vector *queries) { + TIMER_INIT(); + while (!inserts_done.load()) { + total_query_count.fetch_add(1); + auto q_idx = rand() % queries->size(); + + auto q = (*queries)[q_idx]; + + TIMER_START(); + auto res = extension->query(std::move(q)).get(); + TIMER_STOP(); + + usleep(100000); + + total_query_time.fetch_add(TIMER_RESULT()); + total_res.fetch_add(res.size()); + } +} + +void insert_thread(Ext *extension, std::vector *records, size_t start_idx, + size_t stop_idx) { + gsl_rng *rng = gsl_rng_alloc(gsl_rng_mt19937); + + TIMER_INIT(); + + TIMER_START(); + for (size_t i = start_idx; i < stop_idx; i++) { + while (!extension->insert((*records)[i], rng)) { + usleep(1); + } + } + + TIMER_STOP(); + total_insert_time.fetch_add(TIMER_RESULT()); + gsl_rng_free(rng); +} + +void usage(char *progname) { + fprintf(stderr, "%s reccnt datafile queryfile\n", progname); +} + +int main(int argc, char **argv) { + + if (argc < 4) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + std::string d_fname = std::string(argv[2]); + std::string q_fname = std::string(argv[3]); + + auto data = read_vector_file(d_fname, n); + auto queries = read_knn_queries(q_fname, 100, 1); + + size_t buffer_size = 1000; + std::vector policies = {0}; + std::vector thread_counts = {32}; + std::vector modifiers = {0}; + std::vector scale_factors = {8}; + std::vector rate_limits = {1}; + + size_t insert_threads = 1; + size_t query_threads = 1; + + reccnt = n; + + for (auto pol : policies) { + for (auto internal_thread_cnt : thread_counts) { + for (auto mod : modifiers) { + for (auto sf : scale_factors) { + for (auto lim : rate_limits) { + + auto policy = get_policy(sf, buffer_size, pol, n, mod); + auto config = Conf(std::move(policy)); + config.recon_enable_maint_on_flush = true; + config.recon_maint_disabled = false; + config.buffer_size = buffer_size; + config.buffer_flush_trigger = buffer_size; + config.buffer_flush_query_preemption_trigger = 4e5; + + config.maximum_threads = internal_thread_cnt; + + g_thrd_cnt = internal_thread_cnt; + + total_insert_time.store(0); + total_query_time.store(0); + total_query_count.store(0); + + auto extension = new Ext(std::move(config), lim); + + /* warmup structure w/ 10% of records */ + size_t warmup = .3 * n; + for (size_t k = 0; k < warmup; k++) { + while (!extension->insert(data[k])) { + usleep(1); + } + } + + extension->await_version(); + + idx.store(warmup); + + std::thread i_thrds[insert_threads]; + std::thread q_thrds[query_threads]; + + size_t per_insert_thrd = (n - warmup) / insert_threads; + size_t start = warmup; + + for (size_t i = 0; i < insert_threads; i++) { + i_thrds[i] = std::thread(insert_thread, extension, &data, start, + start + per_insert_thrd); + start += per_insert_thrd; + } + + for (size_t i = 0; i < query_threads; i++) { + q_thrds[i] = std::thread(query_thread, extension, &queries); + } + + for (size_t i = 0; i < insert_threads; i++) { + i_thrds[i].join(); + } + + inserts_done.store(true); + + for (size_t i = 0; i < query_threads; i++) { + q_thrds[i].join(); + } + + fprintf(stderr, "%ld\n", total_res.load()); + + size_t insert_tput = + ((double)(n - warmup) / (double)total_insert_time) * 1e9; + size_t query_lat = (double)total_query_time.load() / + (double)total_query_count.load(); + + fprintf(stdout, "%ld\t%lf\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\n", + internal_thread_cnt, lim, pol, sf, mod, extension->get_height(), + extension->get_shard_count(), insert_tput, query_lat); + fflush(stdout); + + total_res.store(0); + inserts_done.store(false); + delete extension; + } + } + } + } + } + + fflush(stderr); +} diff --git a/benchmarks/tail-latency/knn_query_parm_sweep.cpp b/benchmarks/tail-latency/knn_query_parm_sweep.cpp index 1766c74..dcbe7cf 100644 --- a/benchmarks/tail-latency/knn_query_parm_sweep.cpp +++ b/benchmarks/tail-latency/knn_query_parm_sweep.cpp @@ -107,26 +107,28 @@ int main(int argc, char **argv) { auto data = read_vector_file(d_fname, n); auto queries = read_knn_queries(q_fname, 100, 1); - size_t buffer_size = 1000; - std::vector policies = {0, 1}; + std::vector buffer_sizes = {100, 500, 1000, 10000}; + std::vector policies = {0, 1, 2}; std::vector thread_counts = {8}; std::vector modifiers = {0}; - std::vector scale_factors = {2, 4, 6, 8, 16, 32, 128}; + std::vector scale_factors = {2, 8}; size_t insert_threads = 1; size_t query_threads = 1; reccnt = n; + for (auto bs : buffer_sizes) { for (auto pol : policies) { for (auto internal_thread_cnt : thread_counts) { for (auto mod : modifiers) { for (auto sf : scale_factors) { - auto policy = get_policy(sf, buffer_size, pol, n, mod); + auto policy = get_policy(sf, bs, pol, n, mod); auto config = Conf(std::move(policy)); config.recon_enable_maint_on_flush = true; config.recon_maint_disabled = false; + config.buffer_size = bs; config.buffer_flush_trigger = config.buffer_size; config.maximum_threads = internal_thread_cnt; @@ -212,6 +214,7 @@ int main(int argc, char **argv) { } } } + } fflush(stderr); } diff --git a/benchmarks/tail-latency/knn_selectivity_sweep.cpp b/benchmarks/tail-latency/knn_selectivity_sweep.cpp index cf14330..af03d17 100644 --- a/benchmarks/tail-latency/knn_selectivity_sweep.cpp +++ b/benchmarks/tail-latency/knn_selectivity_sweep.cpp @@ -73,7 +73,7 @@ int main(int argc, char **argv) { size_t buffer_size = 1000; std::vector policies = {0, 1}; - std::vector thread_counts = {8}; + std::vector thread_counts = {32}; std::vector modifiers = {0}; std::vector scale_factors = {2, 4, 8}; std::vector knn_sizes = {1, 10, 100, 1000}; @@ -93,6 +93,7 @@ int main(int argc, char **argv) { auto config = Conf(std::move(policy)); config.recon_enable_maint_on_flush = true; config.recon_maint_disabled = false; + config.buffer_size = buffer_size; config.buffer_flush_trigger = config.buffer_size; config.maximum_threads = internal_thread_cnt; @@ -116,6 +117,9 @@ int main(int argc, char **argv) { idx.store(warmup); + fprintf(stderr, "Inserts done\n"); + fflush(stderr); + extension->await_version(); @@ -123,14 +127,15 @@ int main(int argc, char **argv) { size_t total = 0; for (size_t l=0; lquery(std::move(q)); total += res.get().size(); } TIMER_STOP(); - auto query_latency = (TIMER_RESULT()) / (10*query_sets[l].size()); + auto query_latency = (TIMER_RESULT()) / (query_sets[l].size()); fprintf(stdout, "%ld\t%ld\t", knn_sizes[l], query_latency); + fflush(stdout); } fprintf(stdout, "\n"); diff --git a/benchmarks/tail-latency/query_parm_sweep.cpp b/benchmarks/tail-latency/query_parm_sweep.cpp index f2453aa..a53cecf 100644 --- a/benchmarks/tail-latency/query_parm_sweep.cpp +++ b/benchmarks/tail-latency/query_parm_sweep.cpp @@ -102,12 +102,12 @@ int main(int argc, char **argv) { //auto queries = read_range_queries(q_fname, .0001); auto queries =read_sosd_point_lookups(q_fname, 1); - size_t buffer_size = 8000; - std::vector policies = {0, 1}; + std::vector buffer_sizes = {100, 1000, 5000, 8000, 10000, 20000, 100000}; + std::vector policies = {0, 1, 2}; std::vector thread_counts = {8}; std::vector modifiers = {0}; - std::vector scale_factors = {2, 4, 6, 8, 16, 32, 64}; + std::vector scale_factors = {2}; size_t insert_threads = 1; size_t query_threads = 1; @@ -118,10 +118,12 @@ int main(int argc, char **argv) { for (auto internal_thread_cnt : thread_counts) { for (auto mod : modifiers) { for (auto sf : scale_factors) { - auto policy = get_policy(sf, buffer_size, pol, n, mod); + for (auto bs: buffer_sizes) { + auto policy = get_policy(sf, bs, pol, n, mod); auto config = Conf(std::move(policy)); config.recon_enable_maint_on_flush = true; config.recon_maint_disabled = false; + config.buffer_size = bs; config.buffer_flush_trigger = config.buffer_size; config.maximum_threads = internal_thread_cnt; @@ -173,7 +175,7 @@ int main(int argc, char **argv) { total_res.fetch_add(res.size()); } - total_query_count.store(100000); + total_query_count.store(10000); TIMER_INIT(); TIMER_START(); for (size_t i=0; iget_height(), extension->get_shard_count(), insert_tput, query_lat); //extension->print_scheduler_statistics(); @@ -204,6 +206,7 @@ int main(int argc, char **argv) { inserts_done.store(false); delete extension; } + } } } } diff --git a/benchmarks/tail-latency/selectivity_sweep.cpp b/benchmarks/tail-latency/selectivity_sweep.cpp index 0fc0f42..55b499e 100644 --- a/benchmarks/tail-latency/selectivity_sweep.cpp +++ b/benchmarks/tail-latency/selectivity_sweep.cpp @@ -55,20 +55,24 @@ int main(int argc, char **argv) { query_sets.push_back(generate_uniform_range_queries(100, n, sel)); } - std::vector sfs = {2, 4, 8, 16, 32, 64, 128}; - size_t buffer_size = 8000; - std::vector policies = {0, 1}; + std::vector sfs = {8}; + std::vector stalls = {1}; + std::vector buffer_sizes = {100, 1000, 5000, 10000, 20000, 100000}; + std::vector policies = {0,1,2}; for (auto pol: policies) { for (size_t i=0; i(sfs[i], buffer_size, pol, n); + for (size_t j=0; j(sfs[i], bs, pol, n); auto config = Conf(std::move(policy)); config.recon_enable_maint_on_flush = false; config.recon_maint_disabled = true; - config.buffer_flush_trigger = 4000; + config.buffer_size = bs; + config.buffer_flush_trigger = config.buffer_size; config.maximum_threads = 8; - auto extension = new Ext(std::move(config)); + auto extension = new Ext(std::move(config), stalls[j]); /* warmup structure w/ 10% of records */ size_t warmup = .1 * n; @@ -88,7 +92,7 @@ int main(int argc, char **argv) { for (size_t j=warmup; jinsert(data[j])) { usleep(1); - fprintf(stderr, "insert blocked %ld\r", j); + //fprintf(stderr, "insert blocked %ld\r", j); } } TIMER_STOP(); @@ -105,7 +109,7 @@ int main(int argc, char **argv) { /* repeat the queries a bunch of times */ auto insert_throughput = (size_t) ((double) (n - warmup) / (double) total_insert_lat *1.0e9); - fprintf(stdout, "%ld\t%ld\t%ld\t%ld\t", pol, sfs[i], n, insert_throughput); + fprintf(stdout, "%ld\t%ld\t%ld\t%ld\t%lf\t%ld\t", pol, bs, sfs[i], extension->get_record_count(), stalls[j], insert_throughput); size_t total = 0; for (size_t l=0; l *records, size_t start_idx, } void usage(char *progname) { - fprintf(stderr, "%s reccnt datafile rate_limit policy\n", progname); + fprintf(stderr, "%s reccnt datafile rate_limit buffer_size policy\n", progname); } int main(int argc, char **argv) { - if (argc < 5) { + if (argc < 6) { usage(argv[0]); exit(EXIT_FAILURE); } @@ -81,12 +81,12 @@ int main(int argc, char **argv) { size_t n = atol(argv[1]); std::string d_fname = std::string(argv[2]); double rate_limit = std::atof(argv[3]); - size_t pol = std::atol(argv[4]); + size_t buffer_size = std::atol(argv[4]); + size_t pol = std::atol(argv[5]); assert(pol >= 0 && pol <= 6); auto data = read_sosd_file(d_fname, n); - size_t buffer_size = 8000; size_t scale_factor = 8; double modifier = 0; size_t insert_threads = 1; @@ -99,7 +99,8 @@ int main(int argc, char **argv) { auto config = Conf(std::move(policy)); config.recon_enable_maint_on_flush = true; config.recon_maint_disabled = false; - // config.buffer_flush_trigger = 4000; + config.buffer_size = buffer_size; + config.buffer_flush_trigger = buffer_size; config.maximum_threads = internal_thread_cnt; g_thrd_cnt = internal_thread_cnt; diff --git a/benchmarks/tail-latency/stall_benchmark_vptree.cpp b/benchmarks/tail-latency/stall_benchmark_vptree.cpp index 28680cc..26df573 100644 --- a/benchmarks/tail-latency/stall_benchmark_vptree.cpp +++ b/benchmarks/tail-latency/stall_benchmark_vptree.cpp @@ -99,6 +99,8 @@ int main(int argc, char **argv) { auto config = Conf(std::move(policy)); config.recon_enable_maint_on_flush = true; config.recon_maint_disabled = false; + config.buffer_size = buffer_size; + config.buffer_flush_trigger = buffer_size; // config.buffer_flush_trigger = 4000; config.maximum_threads = internal_thread_cnt; -- cgit v1.2.3