diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-04-07 14:37:19 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-04-07 14:37:19 -0400 |
| commit | a9ee3f2f7f557adbe5448e4f4fbf71c23fe808b1 (patch) | |
| tree | c3c404223d852b64a57d5265221c4a2d05d0af73 /benchmarks/tail-latency/insert_query_threads.cpp | |
| parent | 6bdcf74ad91e0efaa8c2e4339f5085fde8a7982b (diff) | |
| download | dynamic-extension-a9ee3f2f7f557adbe5448e4f4fbf71c23fe808b1.tar.gz | |
Implemented the legacy policies and fixed a few bugs
Diffstat (limited to 'benchmarks/tail-latency/insert_query_threads.cpp')
| -rw-r--r-- | benchmarks/tail-latency/insert_query_threads.cpp | 144 |
1 files changed, 73 insertions, 71 deletions
diff --git a/benchmarks/tail-latency/insert_query_threads.cpp b/benchmarks/tail-latency/insert_query_threads.cpp index 1188ce0..fe001ea 100644 --- a/benchmarks/tail-latency/insert_query_threads.cpp +++ b/benchmarks/tail-latency/insert_query_threads.cpp @@ -8,16 +8,16 @@ #include <thread> -#include "framework/scheduling/SerialScheduler.h" -#include "framework/util/Configuration.h" -#include "util/types.h" #include "file_util.h" #include "framework/DynamicExtension.h" #include "framework/interface/Record.h" #include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/SerialScheduler.h" +#include "framework/util/Configuration.h" #include "query/rangecount.h" #include "shard/TrieSpline.h" #include "standard_benchmarks.h" +#include "util/types.h" #include "framework/reconstruction/FixedShardCountPolicy.h" @@ -54,25 +54,26 @@ std::atomic<size_t> total_query_count = 0; void query_thread(Ext *extension, std::vector<QP> *queries) { TIMER_INIT(); while (!inserts_done.load()) { - total_query_count.fetch_add(1); - auto q_idx = rand() % queries->size(); + total_query_count.fetch_add(1); + auto q_idx = rand() % queries->size(); - auto q = (*queries)[q_idx]; + auto q = (*queries)[q_idx]; - TIMER_START(); - auto res = extension->query(std::move(q)).get(); - TIMER_STOP(); + TIMER_START(); + auto res = extension->query(std::move(q)).get(); + TIMER_STOP(); - total_query_time.fetch_add(TIMER_RESULT()); - total_res.fetch_add(res); + total_query_time.fetch_add(TIMER_RESULT()); + total_res.fetch_add(res); } } -void insert_thread(Ext *extension, std::vector<Rec> *records, size_t start_idx, size_t stop_idx) { +void insert_thread(Ext *extension, std::vector<Rec> *records, size_t start_idx, + size_t stop_idx) { TIMER_INIT(); TIMER_START(); - for (size_t i=start_idx; i<stop_idx; i++) { + for (size_t i = start_idx; i < stop_idx; i++) { while (!extension->insert((*records)[i])) { usleep(1); } @@ -102,88 +103,89 @@ int main(int argc, char **argv) { std::vector<size_t> sfs = {8}; //, 4, 8, 16, 32, 64, 128, 256, 512, 1024}; size_t buffer_size = 8000; - std::vector<size_t> policies = { - 0 - }; + std::vector<size_t> policies = {1, 2, 0}; - std::vector<size_t> thread_counts = {1}; + std::vector<size_t> thread_counts = {8}; std::vector<size_t> modifiers = {0, 1, 2, 3}; + std::vector<size_t> scale_factors = {2, 3, 4, 5, 6, 7, 8}; size_t insert_threads = 1; - size_t query_threads = 0; + size_t query_threads = 1; reccnt = n; for (auto pol : policies) { for (auto internal_thread_cnt : thread_counts) { for (auto mod : modifiers) { - auto policy = get_policy<Shard, Q>(sfs[0], buffer_size, pol, n, mod); - auto config = Conf(std::move(policy)); - config.recon_enable_maint_on_flush = true; - config.recon_maint_disabled = false; - // config.buffer_flush_trigger = 4000; - config.maximum_threads = internal_thread_cnt; - - g_thrd_cnt = internal_thread_cnt; - - total_insert_time.store(0); - total_query_time.store(0); - total_query_count.store(0); - - auto extension = new Ext(std::move(config)); - - /* warmup structure w/ 10% of records */ - size_t warmup = 0 * n; - for (size_t k = 0; k < warmup; k++) { - while (!extension->insert(data[k])) { - usleep(1); + for (auto sf : scale_factors) { + auto policy = get_policy<Shard, Q>(sf, buffer_size, pol, n, mod); + auto config = Conf(std::move(policy)); + config.recon_enable_maint_on_flush = true; + config.recon_maint_disabled = false; + // config.buffer_flush_trigger = 4000; + config.maximum_threads = internal_thread_cnt; + + g_thrd_cnt = internal_thread_cnt; + + total_insert_time.store(0); + total_query_time.store(0); + total_query_count.store(0); + + auto extension = new Ext(std::move(config)); + + /* warmup structure w/ 10% of records */ + size_t warmup = .3 * n; + for (size_t k = 0; k < warmup; k++) { + while (!extension->insert(data[k])) { + usleep(1); + } } - } - extension->await_version(); + extension->await_version(); - idx.store(warmup); + idx.store(warmup); - std::thread i_thrds[insert_threads]; - std::thread q_thrds[query_threads]; + std::thread i_thrds[insert_threads]; + std::thread q_thrds[query_threads]; - size_t per_insert_thrd = (n - warmup) / insert_threads; - size_t start = warmup; + size_t per_insert_thrd = (n - warmup) / insert_threads; + size_t start = warmup; - for (size_t i = 0; i < insert_threads; i++) { - i_thrds[i] = std::thread(insert_thread, extension, &data, start, - start + per_insert_thrd); - start += per_insert_thrd; - } + for (size_t i = 0; i < insert_threads; i++) { + i_thrds[i] = std::thread(insert_thread, extension, &data, start, + start + per_insert_thrd); + start += per_insert_thrd; + } - for (size_t i = 0; i < query_threads; i++) { - q_thrds[i] = std::thread(query_thread, extension, &queries); - } + for (size_t i = 0; i < query_threads; i++) { + q_thrds[i] = std::thread(query_thread, extension, &queries); + } - for (size_t i = 0; i < insert_threads; i++) { - i_thrds[i].join(); - } + for (size_t i = 0; i < insert_threads; i++) { + i_thrds[i].join(); + } - inserts_done.store(true); + inserts_done.store(true); - for (size_t i = 0; i < query_threads; i++) { - q_thrds[i].join(); - } + for (size_t i = 0; i < query_threads; i++) { + q_thrds[i].join(); + } - fprintf(stderr, "%ld\n", total_res.load()); + fprintf(stderr, "%ld\n", total_res.load()); - size_t insert_tput = - ((double)(n - warmup) / (double)total_insert_time) * 1e9; - size_t query_lat = - (double)total_query_time.load() / (double)total_query_count.load(); + size_t insert_tput = + ((double)(n - warmup) / (double)total_insert_time) * 1e9; + size_t query_lat = (double)total_query_time.load() / + (double)total_query_count.load(); - fprintf(stdout, "%ld\t%ld\t%ld\t%ld\n", internal_thread_cnt, mod, - insert_tput, query_lat); - fflush(stdout); + fprintf(stdout, "%ld\t%ld\t%ld\t%ld\t%ld\t%ld\n", internal_thread_cnt, pol, sf, mod, + insert_tput, query_lat); + fflush(stdout); - total_res.store(0); - inserts_done.store(false); - delete extension; + total_res.store(0); + inserts_done.store(false); + delete extension; + } } } } |