From a9ee3f2f7f557adbe5448e4f4fbf71c23fe808b1 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 7 Apr 2025 14:37:19 -0400 Subject: Implemented the legacy policies and fixed a few bugs --- benchmarks/tail-latency/insert_query_threads.cpp | 144 +++++++++++----------- benchmarks/tail-latency/standard_latency_dist.cpp | 18 +-- 2 files changed, 83 insertions(+), 79 deletions(-) (limited to 'benchmarks/tail-latency') diff --git a/benchmarks/tail-latency/insert_query_threads.cpp b/benchmarks/tail-latency/insert_query_threads.cpp index 1188ce0..fe001ea 100644 --- a/benchmarks/tail-latency/insert_query_threads.cpp +++ b/benchmarks/tail-latency/insert_query_threads.cpp @@ -8,16 +8,16 @@ #include -#include "framework/scheduling/SerialScheduler.h" -#include "framework/util/Configuration.h" -#include "util/types.h" #include "file_util.h" #include "framework/DynamicExtension.h" #include "framework/interface/Record.h" #include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/SerialScheduler.h" +#include "framework/util/Configuration.h" #include "query/rangecount.h" #include "shard/TrieSpline.h" #include "standard_benchmarks.h" +#include "util/types.h" #include "framework/reconstruction/FixedShardCountPolicy.h" @@ -54,25 +54,26 @@ std::atomic total_query_count = 0; void query_thread(Ext *extension, std::vector *queries) { TIMER_INIT(); while (!inserts_done.load()) { - total_query_count.fetch_add(1); - auto q_idx = rand() % queries->size(); + total_query_count.fetch_add(1); + auto q_idx = rand() % queries->size(); - auto q = (*queries)[q_idx]; + auto q = (*queries)[q_idx]; - TIMER_START(); - auto res = extension->query(std::move(q)).get(); - TIMER_STOP(); + TIMER_START(); + auto res = extension->query(std::move(q)).get(); + TIMER_STOP(); - total_query_time.fetch_add(TIMER_RESULT()); - total_res.fetch_add(res); + total_query_time.fetch_add(TIMER_RESULT()); + total_res.fetch_add(res); } } -void insert_thread(Ext *extension, std::vector *records, size_t start_idx, size_t stop_idx) { +void insert_thread(Ext *extension, std::vector *records, size_t start_idx, + size_t stop_idx) { TIMER_INIT(); TIMER_START(); - for (size_t i=start_idx; iinsert((*records)[i])) { usleep(1); } @@ -102,88 +103,89 @@ int main(int argc, char **argv) { std::vector sfs = {8}; //, 4, 8, 16, 32, 64, 128, 256, 512, 1024}; size_t buffer_size = 8000; - std::vector policies = { - 0 - }; + std::vector policies = {1, 2, 0}; - std::vector thread_counts = {1}; + std::vector thread_counts = {8}; std::vector modifiers = {0, 1, 2, 3}; + std::vector scale_factors = {2, 3, 4, 5, 6, 7, 8}; size_t insert_threads = 1; - size_t query_threads = 0; + size_t query_threads = 1; reccnt = n; for (auto pol : policies) { for (auto internal_thread_cnt : thread_counts) { for (auto mod : modifiers) { - auto policy = get_policy(sfs[0], buffer_size, pol, n, mod); - auto config = Conf(std::move(policy)); - config.recon_enable_maint_on_flush = true; - config.recon_maint_disabled = false; - // config.buffer_flush_trigger = 4000; - config.maximum_threads = internal_thread_cnt; - - g_thrd_cnt = internal_thread_cnt; - - total_insert_time.store(0); - total_query_time.store(0); - total_query_count.store(0); - - auto extension = new Ext(std::move(config)); - - /* warmup structure w/ 10% of records */ - size_t warmup = 0 * n; - for (size_t k = 0; k < warmup; k++) { - while (!extension->insert(data[k])) { - usleep(1); + for (auto sf : scale_factors) { + auto policy = get_policy(sf, buffer_size, pol, n, mod); + auto config = Conf(std::move(policy)); + config.recon_enable_maint_on_flush = true; + config.recon_maint_disabled = false; + // config.buffer_flush_trigger = 4000; + config.maximum_threads = internal_thread_cnt; + + g_thrd_cnt = internal_thread_cnt; + + total_insert_time.store(0); + total_query_time.store(0); + total_query_count.store(0); + + auto extension = new Ext(std::move(config)); + + /* warmup structure w/ 10% of records */ + size_t warmup = .3 * n; + for (size_t k = 0; k < warmup; k++) { + while (!extension->insert(data[k])) { + usleep(1); + } } - } - extension->await_version(); + extension->await_version(); - idx.store(warmup); + idx.store(warmup); - std::thread i_thrds[insert_threads]; - std::thread q_thrds[query_threads]; + std::thread i_thrds[insert_threads]; + std::thread q_thrds[query_threads]; - size_t per_insert_thrd = (n - warmup) / insert_threads; - size_t start = warmup; + size_t per_insert_thrd = (n - warmup) / insert_threads; + size_t start = warmup; - for (size_t i = 0; i < insert_threads; i++) { - i_thrds[i] = std::thread(insert_thread, extension, &data, start, - start + per_insert_thrd); - start += per_insert_thrd; - } + for (size_t i = 0; i < insert_threads; i++) { + i_thrds[i] = std::thread(insert_thread, extension, &data, start, + start + per_insert_thrd); + start += per_insert_thrd; + } - for (size_t i = 0; i < query_threads; i++) { - q_thrds[i] = std::thread(query_thread, extension, &queries); - } + for (size_t i = 0; i < query_threads; i++) { + q_thrds[i] = std::thread(query_thread, extension, &queries); + } - for (size_t i = 0; i < insert_threads; i++) { - i_thrds[i].join(); - } + for (size_t i = 0; i < insert_threads; i++) { + i_thrds[i].join(); + } - inserts_done.store(true); + inserts_done.store(true); - for (size_t i = 0; i < query_threads; i++) { - q_thrds[i].join(); - } + for (size_t i = 0; i < query_threads; i++) { + q_thrds[i].join(); + } - fprintf(stderr, "%ld\n", total_res.load()); + fprintf(stderr, "%ld\n", total_res.load()); - size_t insert_tput = - ((double)(n - warmup) / (double)total_insert_time) * 1e9; - size_t query_lat = - (double)total_query_time.load() / (double)total_query_count.load(); + size_t insert_tput = + ((double)(n - warmup) / (double)total_insert_time) * 1e9; + size_t query_lat = (double)total_query_time.load() / + (double)total_query_count.load(); - fprintf(stdout, "%ld\t%ld\t%ld\t%ld\n", internal_thread_cnt, mod, - insert_tput, query_lat); - fflush(stdout); + fprintf(stdout, "%ld\t%ld\t%ld\t%ld\t%ld\t%ld\n", internal_thread_cnt, pol, sf, mod, + insert_tput, query_lat); + fflush(stdout); - total_res.store(0); - inserts_done.store(false); - delete extension; + total_res.store(0); + inserts_done.store(false); + delete extension; + } } } } diff --git a/benchmarks/tail-latency/standard_latency_dist.cpp b/benchmarks/tail-latency/standard_latency_dist.cpp index e68b646..8f1594b 100644 --- a/benchmarks/tail-latency/standard_latency_dist.cpp +++ b/benchmarks/tail-latency/standard_latency_dist.cpp @@ -52,7 +52,7 @@ int main(int argc, char **argv) { std::vector sfs = {2, 3, 4, 5, 6, 7, 8}; //, 4, 8, 16, 32, 64, 128, 256, 512, 1024}; size_t buffer_size = 8000; - std::vector policies = {1}; + std::vector policies = {0, 1, 2}; for (auto pol: policies) { for (size_t i=0; iinsert(data[j])) { usleep(1); } - - //fprintf(stderr, "%ld\r", j); } extension->await_version(); - fprintf(stderr, "\n[I] Running Insertion Benchmark\n"); + // fprintf(stderr, "\n[I] Running Insertion Benchmark\n"); TIMER_INIT(); @@ -90,10 +89,13 @@ int main(int argc, char **argv) { TIMER_STOP(); auto total_insert_lat = TIMER_RESULT(); - fprintf(stderr, "\n[I] Finished running insertion benchmark\n"); + // extension->print_structure(); + // fflush(stdout); + + // fprintf(stderr, "\n[I] Finished running insertion benchmark\n"); extension->await_version(); - fprintf(stderr, "[I] Running query benchmark\n"); + // fprintf(stderr, "[I] Running query benchmark\n"); size_t total = 0; /* repeat the queries a bunch of times */ @@ -107,7 +109,7 @@ int main(int argc, char **argv) { } TIMER_STOP(); auto total_query_lat = TIMER_RESULT(); - fprintf(stderr, "[I] Finished running query benchmark\n"); + // fprintf(stderr, "[I] Finished running query benchmark\n"); auto query_latency = total_query_lat / (10*queries.size()); auto insert_throughput = (size_t) ((double) (n - warmup) / (double) total_insert_lat *1.0e9); @@ -116,7 +118,7 @@ int main(int argc, char **argv) { fprintf(stderr, "%ld\n", total); fflush(stdout); - extension->print_structure(); + // extension->print_structure(); delete extension; } } -- cgit v1.2.3