diff options
| -rw-r--r-- | CMakeLists.txt | 20 | ||||
| -rw-r--r-- | benchmarks/tail-latency/btree_insert_dist.cpp | 2 | ||||
| -rw-r--r-- | benchmarks/tail-latency/btree_tput.cpp | 78 | ||||
| -rw-r--r-- | benchmarks/tail-latency/fixed_shard_count.cpp | 19 | ||||
| -rw-r--r-- | benchmarks/tail-latency/fixed_shard_count_dist.cpp | 111 | ||||
| -rw-r--r-- | benchmarks/tail-latency/insert_query_threads.cpp | 6 | ||||
| -rw-r--r-- | benchmarks/tail-latency/isam_latency_dist.cpp (renamed from benchmarks/tail-latency/isam_insert_dist.cpp) | 33 | ||||
| -rw-r--r-- | benchmarks/tail-latency/knn_insert_query_threads.cpp | 206 | ||||
| -rw-r--r-- | benchmarks/tail-latency/knn_query_parm_sweep.cpp | 19 | ||||
| -rw-r--r-- | benchmarks/tail-latency/knn_selectivity_sweep.cpp | 13 | ||||
| -rw-r--r-- | benchmarks/tail-latency/query_parm_sweep.cpp | 19 | ||||
| -rw-r--r-- | benchmarks/tail-latency/selectivity_sweep.cpp | 24 | ||||
| -rw-r--r-- | benchmarks/tail-latency/stall_benchmark.cpp | 11 | ||||
| -rw-r--r-- | benchmarks/tail-latency/stall_benchmark_vptree.cpp | 2 | ||||
| -rw-r--r-- | benchmarks/tail-latency/standard_latency_dist.cpp | 18 | ||||
| m--------- | external/psudb-common | 0 |
16 files changed, 517 insertions, 64 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 31cdc50..6ad9040 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -297,6 +297,11 @@ if (tail_bench) target_include_directories(fixed_shard_count PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) target_link_options(fixed_shard_count PUBLIC -mcx16) + add_executable(fixed_shard_count_dist ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/fixed_shard_count_dist.cpp) + target_link_libraries(fixed_shard_count_dist PUBLIC gsl pthread atomic) + target_include_directories(fixed_shard_count_dist PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) + target_link_options(fixed_shard_count_dist PUBLIC -mcx16) + add_executable(standard_latency_dist ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/standard_latency_dist.cpp) target_link_libraries(standard_latency_dist PUBLIC gsl pthread atomic) target_include_directories(standard_latency_dist PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) @@ -307,6 +312,11 @@ if (tail_bench) target_include_directories(btree_insert_dist PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) target_link_options(btree_insert_dist PUBLIC -mcx16) + add_executable(btree_tput ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/btree_tput.cpp) + target_link_libraries(btree_tput PUBLIC gsl pthread atomic) + target_include_directories(btree_tput PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) + target_link_options(btree_tput PUBLIC -mcx16) + add_executable(mixed_workload ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/mixed_workload.cpp) target_link_libraries(mixed_workload PUBLIC gsl pthread atomic) target_include_directories(mixed_workload PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) @@ -323,6 +333,11 @@ if (tail_bench) target_include_directories(insert_query_threads PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) target_link_options(insert_query_threads PUBLIC -mcx16) + add_executable(knn_insert_query_threads ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/knn_insert_query_threads.cpp) + target_link_libraries(knn_insert_query_threads PUBLIC gsl pthread atomic) + target_include_directories(knn_insert_query_threads PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) + target_link_options(knn_insert_query_threads PUBLIC -mcx16) + add_executable(query_detail ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/query_detail.cpp) target_link_libraries(query_detail PUBLIC gsl pthread atomic) target_include_directories(query_detail PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) @@ -362,6 +377,11 @@ if (tail_bench) target_link_libraries(knn_selectivity_sweep PUBLIC gsl pthread atomic) target_include_directories(knn_selectivity_sweep PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) target_link_options(knn_selectivity_sweep PUBLIC -mcx16) + + add_executable(isam_latency_dist ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/isam_latency_dist.cpp) + target_link_libraries(isam_latency_dist PUBLIC gsl pthread atomic) + target_include_directories(isam_latency_dist PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) + target_link_options(isam_latency_dist PUBLIC -mcx16) endif() if (bench) diff --git a/benchmarks/tail-latency/btree_insert_dist.cpp b/benchmarks/tail-latency/btree_insert_dist.cpp index af60819..e1d171d 100644 --- a/benchmarks/tail-latency/btree_insert_dist.cpp +++ b/benchmarks/tail-latency/btree_insert_dist.cpp @@ -45,7 +45,7 @@ int main(int argc, char **argv) { auto queries = read_range_queries<QP>(q_fname, .0001); /* warmup structure w/ 10% of records */ - size_t warmup = .1 * n; + size_t warmup = .3 * n; for (size_t i=0; i<warmup; i++) { btree.insert(data[i]); } diff --git a/benchmarks/tail-latency/btree_tput.cpp b/benchmarks/tail-latency/btree_tput.cpp new file mode 100644 index 0000000..d091ff5 --- /dev/null +++ b/benchmarks/tail-latency/btree_tput.cpp @@ -0,0 +1,78 @@ +/* + * + */ + +#define ENABLE_TIMER + +#include "shard/ISAMTree.h" +#include "query/rangequery.h" +#include "framework/interface/Record.h" +#include "file_util.h" +#include "benchmark_types.h" + +#include <gsl/gsl_rng.h> + +#include "psu-util/timer.h" +#include "standard_benchmarks.h" +#include "psu-ds/BTree.h" + +typedef btree_record<int64_t, int64_t> Rec; + +typedef de::ISAMTree<Rec> Shard; +typedef de::irs::Query<Shard> Q; +typedef Q::Parameters QP; + +void usage(char *progname) { + fprintf(stderr, "%s reccnt datafile queryfile\n", progname); +} + +int main(int argc, char **argv) { + + if (argc < 4) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + std::string d_fname = std::string(argv[2]); + std::string q_fname = std::string(argv[3]); + + auto btree = BenchBTree(); + + auto data = read_sosd_file<Rec>(d_fname, n); + + /* read in the range queries and add sample size and rng for sampling */ + auto queries = read_range_queries<QP>(q_fname, .0001); + + /* warmup structure w/ 10% of records */ + size_t warmup = .3 * n; + for (size_t i=0; i<warmup; i++) { + btree.insert(data[i]); + } + + TIMER_INIT(); + /* insertion benchmark */ + TIMER_START(); + for (size_t i=warmup; i<data.size(); i++) { + btree.insert(data[i]); + } + TIMER_STOP(); + + size_t insert_tput = + ((double)(n - warmup) / (double)TIMER_RESULT()) * 1e9; + + fprintf(stdout, "%ld\n", insert_tput); + + // /* run queries */ + // TIMER_START(); + // size_t total = 0; + // for (size_t j=0; j<10; j++) { + // for (size_t i=0; i<queries.size(); i++) { + // total += btree.range_count(queries[i].lower_bound, queries[i].upper_bound); + // } + // } + // TIMER_STOP(); + + // fprintf(stderr, "%ld\n", total); +} + diff --git a/benchmarks/tail-latency/fixed_shard_count.cpp b/benchmarks/tail-latency/fixed_shard_count.cpp index e980bcf..45c62fb 100644 --- a/benchmarks/tail-latency/fixed_shard_count.cpp +++ b/benchmarks/tail-latency/fixed_shard_count.cpp @@ -2,6 +2,7 @@ * */ +#include "framework/scheduling/SerialScheduler.h" #define ENABLE_TIMER #define TS_TEST @@ -9,8 +10,9 @@ #include "framework/DynamicExtension.h" #include "framework/scheduling/FIFOScheduler.h" -#include "shard/TrieSpline.h" +#include "shard/ISAMTree.h" #include "query/rangecount.h" +#include "framework/util/Configuration.h" #include "framework/interface/Record.h" #include "file_util.h" #include "standard_benchmarks.h" @@ -23,10 +25,11 @@ typedef de::Record<uint64_t, uint64_t> Rec; -typedef de::TrieSpline<Rec> Shard; +typedef de::ISAMTree<Rec> Shard; typedef de::rc::Query<Shard> Q; -typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, de::FIFOScheduler> Ext; +typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Ext; typedef Q::Parameters QP; +typedef de::DEConfiguration<Shard, Q, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Conf; void usage(char *progname) { fprintf(stderr, "%s reccnt datafile queryfile\n", progname); @@ -52,10 +55,16 @@ int main(int argc, char **argv) { for (size_t i=0; i<shard_counts.size(); i++) { auto policy = get_policy<Shard, Q>(shard_counts[i], buffer_size, 4, n); - auto extension = new Ext(std::move(policy)); + auto config = Conf(std::move(policy)); + config.recon_enable_maint_on_flush = false; + config.recon_maint_disabled = true; + config.buffer_flush_trigger = 4000; + config.maximum_threads = 8; + + auto extension = new Ext(std::move(config)); /* warmup structure w/ 10% of records */ - size_t warmup = .1 * n; + size_t warmup = .3 * n; for (size_t i=0; i<warmup; i++) { while (!extension->insert(data[i])) { usleep(1); diff --git a/benchmarks/tail-latency/fixed_shard_count_dist.cpp b/benchmarks/tail-latency/fixed_shard_count_dist.cpp new file mode 100644 index 0000000..34dd15d --- /dev/null +++ b/benchmarks/tail-latency/fixed_shard_count_dist.cpp @@ -0,0 +1,111 @@ +/* + * + */ + +#include "framework/scheduling/SerialScheduler.h" +#define ENABLE_TIMER +#define TS_TEST + +#include <thread> + +#include "framework/DynamicExtension.h" +#include "framework/scheduling/FIFOScheduler.h" +#include "shard/ISAMTree.h" +#include "query/rangecount.h" +#include "framework/util/Configuration.h" +#include "framework/interface/Record.h" +#include "file_util.h" +#include "standard_benchmarks.h" + +#include "framework/reconstruction/FixedShardCountPolicy.h" + +#include <gsl/gsl_rng.h> + +#include "psu-util/timer.h" + + +typedef de::Record<uint64_t, uint64_t> Rec; +typedef de::ISAMTree<Rec> Shard; +typedef de::rc::Query<Shard> Q; +typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Ext; +typedef Q::Parameters QP; +typedef de::DEConfiguration<Shard, Q, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Conf; + +void usage(char *progname) { + fprintf(stderr, "%s reccnt datafile queryfile\n", progname); +} + +int main(int argc, char **argv) { + + if (argc < 4) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + std::string d_fname = std::string(argv[2]); + std::string q_fname = std::string(argv[3]); + + + auto data = read_sosd_file<Rec>(d_fname, n); + auto queries = read_range_queries<QP>(q_fname, .0001); + + std::vector<size_t> shard_counts = {4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 4096*2}; + size_t buffer_size = 8000; + + for (size_t i=0; i<shard_counts.size(); i++) { + auto policy = get_policy<Shard, Q>(shard_counts[i], buffer_size, 4, n); + auto config = Conf(std::move(policy)); + config.recon_enable_maint_on_flush = false; + config.recon_maint_disabled = true; + config.buffer_flush_trigger = 4000; + config.maximum_threads = 8; + + auto extension = new Ext(std::move(config)); + + /* warmup structure w/ 10% of records */ + size_t warmup = .3 * n; + for (size_t i=0; i<warmup; i++) { + while (!extension->insert(data[i])) { + usleep(1); + } + } + + extension->await_version(); + + TIMER_INIT(); + + for (size_t j=warmup; j<data.size(); j++) { + TIMER_START(); + while (!extension->insert(data[j])) { + usleep(1); + } + TIMER_STOP(); + + fprintf(stdout, "%ld\t%ld\n", shard_counts[i], TIMER_RESULT()); + } + + + extension->await_version(); + + // /* repeat the queries a bunch of times */ + // TIMER_START(); + // for (size_t l=0; l<10; l++) { + // for (size_t i=0; i<queries.size(); i++) { + // auto q = queries[i]; + // auto res = extension->query(std::move(q)); + // res.get(); + // } + // } + // TIMER_STOP(); + + // auto query_lat = TIMER_RESULT() / 10*queries.size(); + + // fprintf(stdout, "%ld\t%ld\t%ld\t%ld\n", shard_counts[i], extension->get_shard_count(), insert_tput, query_lat); + + delete extension; + } + + fflush(stderr); +} + diff --git a/benchmarks/tail-latency/insert_query_threads.cpp b/benchmarks/tail-latency/insert_query_threads.cpp index 3f78f16..a5a88f5 100644 --- a/benchmarks/tail-latency/insert_query_threads.cpp +++ b/benchmarks/tail-latency/insert_query_threads.cpp @@ -107,10 +107,10 @@ int main(int argc, char **argv) { size_t buffer_size = 8000; std::vector<size_t> policies = {6}; - std::vector<size_t> thread_counts = {32}; + std::vector<size_t> thread_counts = {1, 2, 4, 8, 32}; std::vector<size_t> modifiers = {0}; - std::vector<size_t> scale_factors = {6}; - std::vector<double> rate_limits = {1, 0.9999, 0.999, 0.99, 0.9, 0.85, 0.8}; + std::vector<size_t> scale_factors = {6}; + std::vector<double> rate_limits = {.95, .99, .993, .995, .998, .9985, .999, 1}; size_t insert_threads = 1; size_t query_threads = 1; diff --git a/benchmarks/tail-latency/isam_insert_dist.cpp b/benchmarks/tail-latency/isam_latency_dist.cpp index 88d37c5..c116413 100644 --- a/benchmarks/tail-latency/isam_insert_dist.cpp +++ b/benchmarks/tail-latency/isam_latency_dist.cpp @@ -3,6 +3,7 @@ */ #include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/SerialScheduler.h" #define ENABLE_TIMER #define TS_TEST @@ -14,6 +15,7 @@ #include "framework/interface/Record.h" #include "file_util.h" #include "standard_benchmarks.h" +#include "framework/util/Configuration.h" #include <gsl/gsl_rng.h> @@ -23,8 +25,11 @@ typedef de::Record<uint64_t, uint64_t> Rec; typedef de::ISAMTree<Rec>Shard; typedef de::rc::Query<Shard> Q; -typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, de::FIFOScheduler> Ext; +typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Ext; typedef Q::Parameters QP; +typedef de::DEConfiguration<Shard, Q, de::DeletePolicy::TOMBSTONE, + de::SerialScheduler> + Conf; void usage(char *progname) { fprintf(stderr, "%s reccnt datafile queryfile\n", progname); @@ -45,25 +50,30 @@ int main(int argc, char **argv) { auto data = read_sosd_file<Rec>(d_fname, n); auto queries = read_range_queries<QP>(q_fname, .0001); - std::vector<int> policies = {3}; - std::vector<size_t> buffers = {8000, 16000, 32000}; - std::vector<size_t> sfs = {8}; - + std::vector<int> policies = {0, 1}; + std::vector<size_t> buffers = {1000, 8000, 16000}; + std::vector<size_t> sfs = {4}; + for (size_t l=0; l<policies.size(); l++) { for (size_t j=0; j<buffers.size(); j++) { for (size_t k=0; k<sfs.size(); k++) { auto policy = get_policy<Shard, Q>(sfs[k], buffers[j], policies[l]); - auto extension = new Ext(policy, buffers[j]/4, buffers[j]); + auto config = Conf(std::move(policy)); + + config.recon_enable_maint_on_flush = true; + config.recon_maint_disabled = false; + + auto extension = new Ext(std::move(config)); /* warmup structure w/ 10% of records */ - size_t warmup = .1 * n; + size_t warmup = .3 * n; for (size_t i=0; i<warmup; i++) { while (!extension->insert(data[i])) { usleep(1); } } - extension->await_next_epoch(); + extension->await_version(); TIMER_INIT(); @@ -74,10 +84,10 @@ int main(int argc, char **argv) { } TIMER_STOP(); - //fprintf(stdout, "%ld\t%ld\t%d\t%ld\n", sfs[k], buffers[j], policies[l], TIMER_RESULT()); + fprintf(stdout, "I\t%ld\t%ld\t%d\t%ld\n", sfs[k], buffers[j], policies[l], TIMER_RESULT()); } - extension->await_next_epoch(); + extension->await_version(); /* repeat the queries a bunch of times */ for (size_t l=0; l<10; l++) { @@ -88,7 +98,7 @@ int main(int argc, char **argv) { res.get(); TIMER_STOP(); - fprintf(stdout, "%ld\t%ld\t%d\t%ld\n", sfs[k], buffers[j], policies[l], TIMER_RESULT()); + fprintf(stdout, "Q\t%ld\t%ld\t%d\t%ld\n", sfs[k], buffers[j], policies[l], TIMER_RESULT()); } } @@ -97,7 +107,6 @@ int main(int argc, char **argv) { auto res =extension->query(std::move(p)); fprintf(stderr, "%ld\n", res.get()); - extension->await_next_epoch(); delete extension; }}} diff --git a/benchmarks/tail-latency/knn_insert_query_threads.cpp b/benchmarks/tail-latency/knn_insert_query_threads.cpp new file mode 100644 index 0000000..fb7d5f1 --- /dev/null +++ b/benchmarks/tail-latency/knn_insert_query_threads.cpp @@ -0,0 +1,206 @@ +/* + * + */ + +#include <cstdlib> +#define ENABLE_TIMER +#define TS_TEST + +#include <thread> + +#include "file_util.h" +#include "framework/DynamicExtension.h" +#include "framework/interface/Record.h" +#include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/SerialScheduler.h" +#include "framework/util/Configuration.h" +#include "query/knn.h" +#include "shard/VPTree.h" +#include "standard_benchmarks.h" +#include "util/types.h" + +#include "framework/reconstruction/FixedShardCountPolicy.h" + +#include <gsl/gsl_rng.h> + +#include "psu-util/timer.h" + + +typedef Word2VecRec Rec; +typedef de::VPTree<Rec> Shard; +typedef de::knn::Query<Shard> Q; +typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, + de::FIFOScheduler> + Ext; +typedef Q::Parameters QP; +typedef de::DEConfiguration<Shard, Q, de::DeletePolicy::TOMBSTONE, + de::FIFOScheduler> + Conf; + +std::atomic<size_t> idx; +std::atomic<bool> inserts_done = false; + +ssize_t query_ratio = 0; + +std::atomic<size_t> total_res = 0; +size_t reccnt = 0; + +size_t g_thrd_cnt = 0; + +std::atomic<size_t> total_insert_time = 0; +std::atomic<size_t> total_insert_count = 0; +std::atomic<size_t> total_query_time = 0; +std::atomic<size_t> total_query_count = 0; + +void query_thread(Ext *extension, std::vector<QP> *queries) { + TIMER_INIT(); + while (!inserts_done.load()) { + total_query_count.fetch_add(1); + auto q_idx = rand() % queries->size(); + + auto q = (*queries)[q_idx]; + + TIMER_START(); + auto res = extension->query(std::move(q)).get(); + TIMER_STOP(); + + usleep(100000); + + total_query_time.fetch_add(TIMER_RESULT()); + total_res.fetch_add(res.size()); + } +} + +void insert_thread(Ext *extension, std::vector<Rec> *records, size_t start_idx, + size_t stop_idx) { + gsl_rng *rng = gsl_rng_alloc(gsl_rng_mt19937); + + TIMER_INIT(); + + TIMER_START(); + for (size_t i = start_idx; i < stop_idx; i++) { + while (!extension->insert((*records)[i], rng)) { + usleep(1); + } + } + + TIMER_STOP(); + total_insert_time.fetch_add(TIMER_RESULT()); + gsl_rng_free(rng); +} + +void usage(char *progname) { + fprintf(stderr, "%s reccnt datafile queryfile\n", progname); +} + +int main(int argc, char **argv) { + + if (argc < 4) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + std::string d_fname = std::string(argv[2]); + std::string q_fname = std::string(argv[3]); + + auto data = read_vector_file<Rec, W2V_SIZE>(d_fname, n); + auto queries = read_knn_queries<QP>(q_fname, 100, 1); + + size_t buffer_size = 1000; + std::vector<size_t> policies = {0}; + std::vector<size_t> thread_counts = {32}; + std::vector<size_t> modifiers = {0}; + std::vector<size_t> scale_factors = {8}; + std::vector<double> rate_limits = {1}; + + size_t insert_threads = 1; + size_t query_threads = 1; + + reccnt = n; + + for (auto pol : policies) { + for (auto internal_thread_cnt : thread_counts) { + for (auto mod : modifiers) { + for (auto sf : scale_factors) { + for (auto lim : rate_limits) { + + auto policy = get_policy<Shard, Q>(sf, buffer_size, pol, n, mod); + auto config = Conf(std::move(policy)); + config.recon_enable_maint_on_flush = true; + config.recon_maint_disabled = false; + config.buffer_size = buffer_size; + config.buffer_flush_trigger = buffer_size; + config.buffer_flush_query_preemption_trigger = 4e5; + + config.maximum_threads = internal_thread_cnt; + + g_thrd_cnt = internal_thread_cnt; + + total_insert_time.store(0); + total_query_time.store(0); + total_query_count.store(0); + + auto extension = new Ext(std::move(config), lim); + + /* warmup structure w/ 10% of records */ + size_t warmup = .3 * n; + for (size_t k = 0; k < warmup; k++) { + while (!extension->insert(data[k])) { + usleep(1); + } + } + + extension->await_version(); + + idx.store(warmup); + + std::thread i_thrds[insert_threads]; + std::thread q_thrds[query_threads]; + + size_t per_insert_thrd = (n - warmup) / insert_threads; + size_t start = warmup; + + for (size_t i = 0; i < insert_threads; i++) { + i_thrds[i] = std::thread(insert_thread, extension, &data, start, + start + per_insert_thrd); + start += per_insert_thrd; + } + + for (size_t i = 0; i < query_threads; i++) { + q_thrds[i] = std::thread(query_thread, extension, &queries); + } + + for (size_t i = 0; i < insert_threads; i++) { + i_thrds[i].join(); + } + + inserts_done.store(true); + + for (size_t i = 0; i < query_threads; i++) { + q_thrds[i].join(); + } + + fprintf(stderr, "%ld\n", total_res.load()); + + size_t insert_tput = + ((double)(n - warmup) / (double)total_insert_time) * 1e9; + size_t query_lat = (double)total_query_time.load() / + (double)total_query_count.load(); + + fprintf(stdout, "%ld\t%lf\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\n", + internal_thread_cnt, lim, pol, sf, mod, extension->get_height(), + extension->get_shard_count(), insert_tput, query_lat); + fflush(stdout); + + total_res.store(0); + inserts_done.store(false); + delete extension; + } + } + } + } + } + + fflush(stderr); +} diff --git a/benchmarks/tail-latency/knn_query_parm_sweep.cpp b/benchmarks/tail-latency/knn_query_parm_sweep.cpp index dfd713d..dcbe7cf 100644 --- a/benchmarks/tail-latency/knn_query_parm_sweep.cpp +++ b/benchmarks/tail-latency/knn_query_parm_sweep.cpp @@ -105,28 +105,30 @@ int main(int argc, char **argv) { //auto queries =read_sosd_point_lookups<QP>(q_fname, 1); auto data = read_vector_file<Rec, W2V_SIZE>(d_fname, n); - auto queries = read_knn_queries<QP>(q_fname, 15, 1); + auto queries = read_knn_queries<QP>(q_fname, 100, 1); - size_t buffer_size = 8000; - std::vector<size_t> policies = {0}; + std::vector<size_t> buffer_sizes = {100, 500, 1000, 10000}; + std::vector<size_t> policies = {0, 1, 2}; std::vector<size_t> thread_counts = {8}; std::vector<double> modifiers = {0}; - std::vector<size_t> scale_factors = {2, 4, 6, 8, 10}; + std::vector<size_t> scale_factors = {2, 8}; size_t insert_threads = 1; size_t query_threads = 1; reccnt = n; + for (auto bs : buffer_sizes) { for (auto pol : policies) { for (auto internal_thread_cnt : thread_counts) { for (auto mod : modifiers) { for (auto sf : scale_factors) { - auto policy = get_policy<Shard, Q>(sf, buffer_size, pol, n, mod); + auto policy = get_policy<Shard, Q>(sf, bs, pol, n, mod); auto config = Conf(std::move(policy)); config.recon_enable_maint_on_flush = true; config.recon_maint_disabled = false; + config.buffer_size = bs; config.buffer_flush_trigger = config.buffer_size; config.maximum_threads = internal_thread_cnt; @@ -171,14 +173,14 @@ int main(int argc, char **argv) { extension->await_version(); /* run some queries to "warm up" the cache */ - for (size_t i=0; i<queries.size()*2; i++) { + for (size_t i=0; i<queries.size()*5; i++) { auto q_idx = i % queries.size(); auto q = queries[q_idx]; auto res = extension->query(std::move(q)).get(); total_res.fetch_add(res.size()); } - total_query_count.store(100000); + total_query_count.store(5000); TIMER_INIT(); TIMER_START(); for (size_t i=0; i<total_query_count; i++) { @@ -200,7 +202,7 @@ int main(int argc, char **argv) { fprintf(stdout, "%ld\t%ld\t%ld\t%lf\t%ld\t%ld\t%ld\t%ld\n", internal_thread_cnt, pol, sf, mod, extension->get_height(), extension->get_shard_count(), insert_tput, query_lat); - extension->print_scheduler_statistics(); + //extension->print_scheduler_statistics(); //extension->print_scheduler_query_data(); //extension->print_structure(); fflush(stdout); @@ -212,6 +214,7 @@ int main(int argc, char **argv) { } } } + } fflush(stderr); } diff --git a/benchmarks/tail-latency/knn_selectivity_sweep.cpp b/benchmarks/tail-latency/knn_selectivity_sweep.cpp index cf14330..af03d17 100644 --- a/benchmarks/tail-latency/knn_selectivity_sweep.cpp +++ b/benchmarks/tail-latency/knn_selectivity_sweep.cpp @@ -73,7 +73,7 @@ int main(int argc, char **argv) { size_t buffer_size = 1000; std::vector<size_t> policies = {0, 1}; - std::vector<size_t> thread_counts = {8}; + std::vector<size_t> thread_counts = {32}; std::vector<double> modifiers = {0}; std::vector<size_t> scale_factors = {2, 4, 8}; std::vector<size_t> knn_sizes = {1, 10, 100, 1000}; @@ -93,6 +93,7 @@ int main(int argc, char **argv) { auto config = Conf(std::move(policy)); config.recon_enable_maint_on_flush = true; config.recon_maint_disabled = false; + config.buffer_size = buffer_size; config.buffer_flush_trigger = config.buffer_size; config.maximum_threads = internal_thread_cnt; @@ -116,6 +117,9 @@ int main(int argc, char **argv) { idx.store(warmup); + fprintf(stderr, "Inserts done\n"); + fflush(stderr); + extension->await_version(); @@ -123,14 +127,15 @@ int main(int argc, char **argv) { size_t total = 0; for (size_t l=0; l<query_sets.size(); l++) { TIMER_START(); - for (size_t f=0; f<query_sets[l].size()*10; f++) { - auto q = query_sets[l][f%10]; + for (size_t f=0; f<query_sets[l].size(); f++) { + auto q = query_sets[l][f]; auto res = extension->query(std::move(q)); total += res.get().size(); } TIMER_STOP(); - auto query_latency = (TIMER_RESULT()) / (10*query_sets[l].size()); + auto query_latency = (TIMER_RESULT()) / (query_sets[l].size()); fprintf(stdout, "%ld\t%ld\t", knn_sizes[l], query_latency); + fflush(stdout); } fprintf(stdout, "\n"); diff --git a/benchmarks/tail-latency/query_parm_sweep.cpp b/benchmarks/tail-latency/query_parm_sweep.cpp index 36665a6..a53cecf 100644 --- a/benchmarks/tail-latency/query_parm_sweep.cpp +++ b/benchmarks/tail-latency/query_parm_sweep.cpp @@ -102,12 +102,12 @@ int main(int argc, char **argv) { //auto queries = read_range_queries<QP>(q_fname, .0001); auto queries =read_sosd_point_lookups<QP>(q_fname, 1); - size_t buffer_size = 8000; - std::vector<size_t> policies = {1}; + std::vector<size_t> buffer_sizes = {100, 1000, 5000, 8000, 10000, 20000, 100000}; + std::vector<size_t> policies = {0, 1, 2}; std::vector<size_t> thread_counts = {8}; std::vector<double> modifiers = {0}; - std::vector<size_t> scale_factors = {4, 4, 4, 4}; + std::vector<size_t> scale_factors = {2}; size_t insert_threads = 1; size_t query_threads = 1; @@ -118,10 +118,12 @@ int main(int argc, char **argv) { for (auto internal_thread_cnt : thread_counts) { for (auto mod : modifiers) { for (auto sf : scale_factors) { - auto policy = get_policy<Shard, Q>(sf, buffer_size, pol, n, mod); + for (auto bs: buffer_sizes) { + auto policy = get_policy<Shard, Q>(sf, bs, pol, n, mod); auto config = Conf(std::move(policy)); config.recon_enable_maint_on_flush = true; config.recon_maint_disabled = false; + config.buffer_size = bs; config.buffer_flush_trigger = config.buffer_size; config.maximum_threads = internal_thread_cnt; @@ -173,7 +175,7 @@ int main(int argc, char **argv) { total_res.fetch_add(res.size()); } - total_query_count.store(100000); + total_query_count.store(10000); TIMER_INIT(); TIMER_START(); for (size_t i=0; i<total_query_count; i++) { @@ -192,11 +194,11 @@ int main(int argc, char **argv) { size_t query_lat = (double)total_query_time.load() / (double)total_query_count.load(); - fprintf(stdout, "%ld\t%ld\t%ld\t%lf\t%ld\t%ld\t%ld\t%ld\n", internal_thread_cnt, pol, sf, + fprintf(stdout, "%ld\t%ld\t%ld\t%ld\t%lf\t%ld\t%ld\t%ld\t%ld\n", internal_thread_cnt, bs, pol, sf, mod, extension->get_height(), extension->get_shard_count(), insert_tput, query_lat); - extension->print_scheduler_statistics(); - extension->print_scheduler_query_data(); + //extension->print_scheduler_statistics(); + //extension->print_scheduler_query_data(); //extension->print_structure(); fflush(stdout); @@ -204,6 +206,7 @@ int main(int argc, char **argv) { inserts_done.store(false); delete extension; } + } } } } diff --git a/benchmarks/tail-latency/selectivity_sweep.cpp b/benchmarks/tail-latency/selectivity_sweep.cpp index 77c088b..55b499e 100644 --- a/benchmarks/tail-latency/selectivity_sweep.cpp +++ b/benchmarks/tail-latency/selectivity_sweep.cpp @@ -49,26 +49,30 @@ int main(int argc, char **argv) { auto data = read_sosd_file<Rec>(d_fname, n); std::vector<std::vector<QP>> query_sets; - std::vector<double> selectivities = {0, 0.00000001, 0.0000001, 0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1}; + std::vector<double> selectivities = {0.0000001}; //, 0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1, .25}; for (auto sel: selectivities) { query_sets.push_back(generate_uniform_range_queries<QP>(100, n, sel)); } - std::vector<size_t> sfs = {2, 4, 8}; //, 4, 8, 16, 32, 64, 128, 256, 512, 1024}; - size_t buffer_size = 8000; - std::vector<size_t> policies = {0, 1}; + std::vector<size_t> sfs = {8}; + std::vector<double> stalls = {1}; + std::vector<size_t> buffer_sizes = {100, 1000, 5000, 10000, 20000, 100000}; + std::vector<size_t> policies = {0,1,2}; for (auto pol: policies) { for (size_t i=0; i<sfs.size(); i++) { - auto policy = get_policy<Shard, Q>(sfs[i], buffer_size, pol, n); + for (size_t j=0; j<stalls.size(); j++) { + for (auto bs: buffer_sizes) { + auto policy = get_policy<Shard, Q>(sfs[i], bs, pol, n); auto config = Conf(std::move(policy)); config.recon_enable_maint_on_flush = false; config.recon_maint_disabled = true; - config.buffer_flush_trigger = 4000; + config.buffer_size = bs; + config.buffer_flush_trigger = config.buffer_size; config.maximum_threads = 8; - auto extension = new Ext(std::move(config)); + auto extension = new Ext(std::move(config), stalls[j]); /* warmup structure w/ 10% of records */ size_t warmup = .1 * n; @@ -88,7 +92,7 @@ int main(int argc, char **argv) { for (size_t j=warmup; j<data.size(); j++) { while (!extension->insert(data[j])) { usleep(1); - fprintf(stderr, "insert blocked %ld\r", j); + //fprintf(stderr, "insert blocked %ld\r", j); } } TIMER_STOP(); @@ -105,7 +109,7 @@ int main(int argc, char **argv) { /* repeat the queries a bunch of times */ auto insert_throughput = (size_t) ((double) (n - warmup) / (double) total_insert_lat *1.0e9); - fprintf(stdout, "%ld\t%ld\t%ld\t%ld\t", pol, sfs[i], n, insert_throughput); + fprintf(stdout, "%ld\t%ld\t%ld\t%ld\t%lf\t%ld\t", pol, bs, sfs[i], extension->get_record_count(), stalls[j], insert_throughput); size_t total = 0; for (size_t l=0; l<query_sets.size(); l++) { @@ -128,6 +132,8 @@ int main(int argc, char **argv) { delete extension; } } + } + } fflush(stderr); } diff --git a/benchmarks/tail-latency/stall_benchmark.cpp b/benchmarks/tail-latency/stall_benchmark.cpp index afb16b3..6734ccd 100644 --- a/benchmarks/tail-latency/stall_benchmark.cpp +++ b/benchmarks/tail-latency/stall_benchmark.cpp @@ -68,12 +68,12 @@ void insert_thread(Ext *extension, std::vector<Rec> *records, size_t start_idx, } void usage(char *progname) { - fprintf(stderr, "%s reccnt datafile rate_limit policy\n", progname); + fprintf(stderr, "%s reccnt datafile rate_limit buffer_size policy\n", progname); } int main(int argc, char **argv) { - if (argc < 5) { + if (argc < 6) { usage(argv[0]); exit(EXIT_FAILURE); } @@ -81,12 +81,12 @@ int main(int argc, char **argv) { size_t n = atol(argv[1]); std::string d_fname = std::string(argv[2]); double rate_limit = std::atof(argv[3]); - size_t pol = std::atol(argv[4]); + size_t buffer_size = std::atol(argv[4]); + size_t pol = std::atol(argv[5]); assert(pol >= 0 && pol <= 6); auto data = read_sosd_file<Rec>(d_fname, n); - size_t buffer_size = 8000; size_t scale_factor = 8; double modifier = 0; size_t insert_threads = 1; @@ -99,7 +99,8 @@ int main(int argc, char **argv) { auto config = Conf(std::move(policy)); config.recon_enable_maint_on_flush = true; config.recon_maint_disabled = false; - // config.buffer_flush_trigger = 4000; + config.buffer_size = buffer_size; + config.buffer_flush_trigger = buffer_size; config.maximum_threads = internal_thread_cnt; g_thrd_cnt = internal_thread_cnt; diff --git a/benchmarks/tail-latency/stall_benchmark_vptree.cpp b/benchmarks/tail-latency/stall_benchmark_vptree.cpp index 28680cc..26df573 100644 --- a/benchmarks/tail-latency/stall_benchmark_vptree.cpp +++ b/benchmarks/tail-latency/stall_benchmark_vptree.cpp @@ -99,6 +99,8 @@ int main(int argc, char **argv) { auto config = Conf(std::move(policy)); config.recon_enable_maint_on_flush = true; config.recon_maint_disabled = false; + config.buffer_size = buffer_size; + config.buffer_flush_trigger = buffer_size; // config.buffer_flush_trigger = 4000; config.maximum_threads = internal_thread_cnt; diff --git a/benchmarks/tail-latency/standard_latency_dist.cpp b/benchmarks/tail-latency/standard_latency_dist.cpp index 8f1594b..dca02bf 100644 --- a/benchmarks/tail-latency/standard_latency_dist.cpp +++ b/benchmarks/tail-latency/standard_latency_dist.cpp @@ -2,17 +2,18 @@ * */ +#define ENABLE_TIMER +#define TS_TEST + #include "framework/scheduling/SerialScheduler.h" #include "framework/util/Configuration.h" #include "util/types.h" -#define ENABLE_TIMER -#define TS_TEST #include <thread> #include "framework/DynamicExtension.h" #include "framework/scheduling/FIFOScheduler.h" -#include "shard/TrieSpline.h" +#include "shard/ISAMTree.h" #include "query/rangecount.h" #include "framework/interface/Record.h" #include "file_util.h" @@ -26,11 +27,11 @@ typedef de::Record<uint64_t, uint64_t> Rec; -typedef de::TrieSpline<Rec> Shard; +typedef de::ISAMTree<Rec> Shard; typedef de::rc::Query<Shard> Q; -typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, de::FIFOScheduler> Ext; +typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Ext; typedef Q::Parameters QP; -typedef de::DEConfiguration<Shard, Q, de::DeletePolicy::TOMBSTONE, de::FIFOScheduler> Conf; +typedef de::DEConfiguration<Shard, Q, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Conf; void usage(char *progname) { fprintf(stderr, "%s reccnt datafile queryfile\n", progname); @@ -50,9 +51,9 @@ int main(int argc, char **argv) { auto data = read_sosd_file<Rec>(d_fname, n); auto queries = read_range_queries<QP>(q_fname, .0001); - std::vector<size_t> sfs = {2, 3, 4, 5, 6, 7, 8}; //, 4, 8, 16, 32, 64, 128, 256, 512, 1024}; + std::vector<size_t> sfs = {4, 8}; //, 4, 8, 16, 32, 64, 128, 256, 512, 1024}; size_t buffer_size = 8000; - std::vector<size_t> policies = {0, 1, 2}; + std::vector<size_t> policies = {0, 1}; for (auto pol: policies) { for (size_t i=0; i<sfs.size(); i++) { @@ -83,7 +84,6 @@ int main(int argc, char **argv) { for (size_t j=warmup; j<data.size(); j++) { while (!extension->insert(data[j])) { usleep(1); - fprintf(stderr, "insert blocked %ld\r", j); } } TIMER_STOP(); diff --git a/external/psudb-common b/external/psudb-common -Subproject 67ec5f53de89c0edd6c46cd85f4ad05d380c16b +Subproject ce3b373b75c28098df83ec95234a90cb4f4d364 |