diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-06-07 10:26:24 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-06-07 10:26:24 -0400 |
| commit | aa5832f5e1bf7fcbd0a9507677a650b5e749ef46 (patch) | |
| tree | 9416c3698579f0340da3643d14745f28102e7e62 | |
| parent | d47eeea719448f649e93b6a9ec7593b4cb2fb40e (diff) | |
| download | dynamic-extension-aa5832f5e1bf7fcbd0a9507677a650b5e749ef46.tar.gz | |
Sampling benchmark
| -rw-r--r-- | CMakeLists.txt | 7 | ||||
| -rw-r--r-- | benchmarks/bench.h | 206 | ||||
| -rw-r--r-- | benchmarks/sampling_tput.cpp | 157 |
3 files changed, 369 insertions, 1 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index fc41db3..2fd2107 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(namespace "de") project("Practical Dynamic Extension" VERSION 0.1.0) -set(debug true) +set(debug false) set(tests True) set(bench True) @@ -70,4 +70,9 @@ endif() if (bench) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/bin/benchmarks") + add_executable(sampling_tput ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/sampling_tput.cpp) + target_link_libraries(sampling_tput PUBLIC gsl pthread) + target_include_directories(sampling_tput PRIVATE include) + + endif() diff --git a/benchmarks/bench.h b/benchmarks/bench.h new file mode 100644 index 0000000..8af244a --- /dev/null +++ b/benchmarks/bench.h @@ -0,0 +1,206 @@ +#ifndef H_BENCH +#define H_BENCH +#include "framework/DynamicExtension.h" +#include "shard/WSS.h" + +#include <cstdlib> +#include <cstdio> +#include <chrono> +#include <algorithm> +#include <numeric> +#include <memory> +#include <iostream> +#include <fstream> +#include <sstream> +#include <unordered_set> +#include <set> +#include <string> +#include <random> + +typedef uint64_t key_type; +typedef uint32_t value_type; +typedef uint64_t weight_type; + +typedef de::WeightedRecord<key_type, value_type, weight_type> WRec; +typedef de::DynamicExtension<WRec, de::WSS<WRec>, de::WSSQuery<WRec>> ExtendedWSS; + +static gsl_rng *g_rng; +static std::set<WRec> *g_to_delete; +static bool g_osm_data; + +static key_type g_min_key = UINT64_MAX; +static key_type g_max_key = 0; + +static size_t g_max_record_cnt = 0; +static size_t g_reccnt = 0; + +static constexpr unsigned int DEFAULT_SEED = 0; + +static unsigned int get_random_seed() +{ + unsigned int seed = 0; + std::fstream urandom; + urandom.open("/dev/urandom", std::ios::in|std::ios::binary); + urandom.read((char *) &seed, sizeof(seed)); + urandom.close(); + + return seed; +} + +static key_type osm_to_key(const char *key_field) { + double tmp_key = (atof(key_field) + 180) * 10e6; + return (key_type) tmp_key; +} + +static void init_bench_rng(unsigned int seed, const gsl_rng_type *type) +{ + g_rng = gsl_rng_alloc(type); + gsl_rng_set(g_rng, seed); +} + +static void init_bench_env(size_t max_reccnt, bool random_seed, bool osm_correction=true) +{ + unsigned int seed = (random_seed) ? get_random_seed() : DEFAULT_SEED; + init_bench_rng(seed, gsl_rng_mt19937); + g_to_delete = new std::set<WRec>(); + g_osm_data = osm_correction; + g_max_record_cnt = max_reccnt; + g_reccnt = 0; +} + +static void delete_bench_env() +{ + gsl_rng_free(g_rng); + delete g_to_delete; +} + +static bool next_record(std::fstream *file, WRec *record) +{ + if (g_reccnt >= g_max_record_cnt) return false; + + std::string line; + if (std::getline(*file, line, '\n')) { + std::stringstream line_stream(line); + std::string key_field; + std::string value_field; + std::string weight_field; + + std::getline(line_stream, value_field, '\t'); + std::getline(line_stream, key_field, '\t'); + std::getline(line_stream, weight_field, '\t'); + + record->key = (g_osm_data) ? osm_to_key(key_field.c_str()) : atol(key_field.c_str()); + record->value = atol(value_field.c_str()); + record->weight = atof(weight_field.c_str()); + + if (record->key < g_min_key) g_min_key = record->key; + + if (record->key > g_max_key) g_max_key = record->key; + + g_reccnt++; + + return true; + } + + return false; +} + +static bool build_insert_vec(std::fstream *file, std::vector<WRec> &vec, size_t n) { + vec.clear(); + for (size_t i=0; i<n; i++) { + WRec rec; + if (!next_record(file, &rec)) { + if (i == 0) { + return false; + } + + break; + } + + vec.emplace_back(rec); + } + + return true; +} + +/* + * helper routines for displaying progress bars to stderr + */ +static const char *g_prog_bar = "======================================================================"; +static const size_t g_prog_width = 50; + +static void progress_update(double percentage, std::string prompt) { + int val = (int) (percentage * 100); + int lpad = (int) (percentage * g_prog_width); + int rpad = (int) (g_prog_width - lpad); + fprintf(stderr, "\r(%3d%%) %20s [%.*s%*s]", val, prompt.c_str(), lpad, g_prog_bar, rpad, ""); + fflush(stderr); + + if (percentage >= 1) fprintf(stderr, "\n"); +} + +static bool warmup(std::fstream *file, ExtendedWSS *extended_wss, size_t count, double delete_prop, bool progress=true) +{ + size_t del_buf_size = 10000; + size_t delete_idx = del_buf_size; + + std::vector<WRec> delbuf; + std::set<WRec> deleted_keys; + + de::wss_query_parms<WRec> parms; + parms.rng = g_rng; + parms.sample_size = del_buf_size; + + size_t inserted = 0; + + double last_percent = 0; + for (size_t i=0; i<count; i++) { + WRec rec; + if (!next_record(file, &rec)) { + return false; + } + + inserted++; + extended_wss->insert(rec); + + if (i > extended_wss->get_buffer_capacity() && delete_idx >= delbuf.size()) { + extended_wss->query(&parms); + delete_idx = 0; + deleted_keys.clear(); + } + + if (i > extended_wss->get_buffer_capacity() && gsl_rng_uniform(g_rng) < delete_prop) { + auto rec = delbuf[delete_idx]; + delete_idx++; + + if (deleted_keys.find(rec) == deleted_keys.end()) { + extended_wss->erase(rec); + deleted_keys.insert(rec); + } + } + + if (progress && ((double) i / (double) count) - last_percent > .01) { + progress_update((double) i / (double) count, "warming up:"); + last_percent = (double) i / (double) count; + } + } + + if (progress) { + progress_update(1, "warming up:"); + } + + return true; +} + + +static void reset_de_perf_metrics() { + + /* + * rejection counters are zeroed automatically by the + * sampling function itself. + */ + + RESET_IO_CNT(); +} + +#endif // H_BENCH diff --git a/benchmarks/sampling_tput.cpp b/benchmarks/sampling_tput.cpp new file mode 100644 index 0000000..3eb72dc --- /dev/null +++ b/benchmarks/sampling_tput.cpp @@ -0,0 +1,157 @@ +#include "bench.h" + +size_t g_insert_batch_size = 1000; + +static bool insert_benchmark(ExtendedWSS *tree, std::fstream *file, + size_t insert_cnt, double delete_prop) { + + size_t delete_cnt = insert_cnt * delete_prop; + size_t delete_batch_size = g_insert_batch_size * delete_prop * 15; + size_t delete_idx = delete_batch_size; + + std::vector<WRec> delbuf; + + std::set<WRec> deleted; + + size_t applied_deletes = 0; + size_t applied_inserts = 0; + + std::vector<WRec> insert_vec; + insert_vec.reserve(g_insert_batch_size); + bool continue_benchmark = true; + + size_t total_time = 0; + + de::wss_query_parms<WRec> parms; + parms.rng = g_rng; + parms.sample_size = delete_batch_size; + + while (applied_inserts < insert_cnt && continue_benchmark) { + continue_benchmark = build_insert_vec(file, insert_vec, g_insert_batch_size); + + if (insert_vec.size() == 0) { + break; + } + + // if we've fully processed the delete vector, sample a new + // set of records to delete. + if (delete_idx >= delbuf.size()) { + delbuf = tree->query(&parms); + deleted.clear(); + delete_idx = 0; + } + + progress_update((double) applied_inserts / (double) insert_cnt, "inserting:"); + size_t local_inserted = 0; + size_t local_deleted = 0; + + auto insert_start = std::chrono::high_resolution_clock::now(); + for (size_t i=0; i<insert_vec.size(); i++) { + // process a delete if necessary + if (applied_deletes < delete_cnt && delete_idx < delete_batch_size && gsl_rng_uniform(g_rng) < delete_prop) { + auto rec = delbuf[delete_idx]; + delete_idx++; + + if (deleted.find(rec) == deleted.end()) { + tree->erase(rec); + deleted.insert(rec); + local_deleted++; + } + } + + // insert the record; + tree->insert(insert_vec[i]); + local_inserted++; + } + auto insert_stop = std::chrono::high_resolution_clock::now(); + + applied_deletes += local_deleted; + applied_inserts += local_inserted; + total_time += std::chrono::duration_cast<std::chrono::nanoseconds>(insert_stop - insert_start).count(); + } + + progress_update(1.0, "inserting:"); + size_t throughput = (((double) (applied_inserts + applied_deletes) / (double) total_time) * 1e9); + + fprintf(stdout, "%ld\t", throughput); + reset_de_perf_metrics(); + + return continue_benchmark; +} + + +static void sample_benchmark(ExtendedWSS *tree, size_t k, size_t trial_cnt) +{ + char progbuf[25]; + sprintf(progbuf, "sampling (%ld):", k); + + size_t batch_size = 100; + size_t batches = trial_cnt / batch_size; + size_t total_time = 0; + + WRec sample_set[k]; + + de::wss_query_parms<WRec> parms; + parms.rng = g_rng; + parms.sample_size = k; + + for (int i=0; i<batches; i++) { + progress_update((double) (i * batch_size) / (double) trial_cnt, progbuf); + auto start = std::chrono::high_resolution_clock::now(); + for (int j=0; j < batch_size; j++) { + auto res = tree->query(&parms); + } + auto stop = std::chrono::high_resolution_clock::now(); + + total_time += std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start).count(); + } + + progress_update(1.0, progbuf); + + size_t throughput = (((double)(trial_cnt * k) / (double) total_time) * 1e9); + + fprintf(stdout, "%ld\n", throughput); + fflush(stdout); +} + + +int main(int argc, char **argv) +{ + if (argc < 7) { + fprintf(stderr, "Usage: sampling_tput <filename> <record_count> <buffer_cap> <scale_factor> <delete_proportion> <max_delete_proportion> [osm_data]\n"); + exit(EXIT_FAILURE); + } + + std::string filename = std::string(argv[1]); + size_t record_count = atol(argv[2]); + size_t buffer_cap = atol(argv[3]); + size_t scale_factor = atol(argv[4]); + double delete_prop = atof(argv[5]); + double max_delete_prop = atof(argv[6]); + bool use_osm = (argc == 8) ? atoi(argv[7]) : 0; + + double insert_batch = 0.1; + + init_bench_env(record_count, true, use_osm); + + auto sampling_lsm = ExtendedWSS(buffer_cap, scale_factor, max_delete_prop); + + std::fstream datafile; + datafile.open(filename, std::ios::in); + + // warm up the tree with initial_insertions number of initially inserted + // records + size_t warmup_cnt = insert_batch * record_count; + warmup(&datafile, &sampling_lsm, warmup_cnt, delete_prop); + + size_t insert_cnt = record_count - warmup_cnt; + + insert_benchmark(&sampling_lsm, &datafile, insert_cnt, delete_prop); + sample_benchmark(&sampling_lsm, 1000, 10000); + + delete_bench_env(); + fflush(stdout); + fflush(stderr); + + exit(EXIT_SUCCESS); +} |