diff options
| author | Douglas B. Rumbaugh <dbr4@psu.edu> | 2024-02-09 14:06:59 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-02-09 14:06:59 -0500 |
| commit | bc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30 (patch) | |
| tree | 66333c55feb0ea8875a50e6dc07c8535d241bf1c /benchmarks | |
| parent | 076e104b8672924c3d80cd1da2fdb5ebee1766ac (diff) | |
| parent | 46885246313358a3b606eca139b20280e96db10e (diff) | |
| download | dynamic-extension-bc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30.tar.gz | |
Merge pull request #1 from dbrumbaugh/new-buffer
Initial Concurrency Implementation
Diffstat (limited to 'benchmarks')
| -rw-r--r-- | benchmarks/btree_insert_query_tput.cpp | 120 | ||||
| -rw-r--r-- | benchmarks/include/btree-util.h | 27 | ||||
| -rw-r--r-- | benchmarks/include/data-proc.h | 258 | ||||
| -rw-r--r-- | benchmarks/insert_query_tput.cpp | 121 | ||||
| -rw-r--r-- | benchmarks/insert_tail_latency.cpp | 81 | ||||
| -rw-r--r-- | benchmarks/insertion_tput.cpp | 72 | ||||
| -rw-r--r-- | benchmarks/irs_bench.cpp | 125 | ||||
| -rw-r--r-- | benchmarks/old-bench/alex_rq_bench.cpp (renamed from benchmarks/alex_rq_bench.cpp) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/alias_wss_bench.cpp (renamed from benchmarks/alias_wss_bench.cpp) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/btree_irs_bench.cpp (renamed from benchmarks/btree_irs_bench.cpp) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/btree_rq_bench.cpp (renamed from benchmarks/btree_rq_bench.cpp) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/include/bench.h (renamed from benchmarks/include/bench.h) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/include/bench_utility.h (renamed from benchmarks/include/bench_utility.h) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/include/standalone_utility.h (renamed from benchmarks/include/standalone_utility.h) | 24 | ||||
| -rw-r--r-- | benchmarks/old-bench/isam_irs_bench.cpp (renamed from benchmarks/isam_irs_bench.cpp) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/isam_rq_bench.cpp (renamed from benchmarks/isam_rq_bench.cpp) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/mtree_knn_bench.cpp (renamed from benchmarks/mtree_knn_bench.cpp) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/pgm_pl_bench.cpp (renamed from benchmarks/pgm_pl_bench.cpp) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/pgm_rq_bench.cpp (renamed from benchmarks/pgm_rq_bench.cpp) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/test.cpp (renamed from benchmarks/test.cpp) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/triespline_rq_bench.cpp (renamed from benchmarks/triespline_rq_bench.cpp) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/upgm_pl_bench.cpp (renamed from benchmarks/upgm_pl_bench.cpp) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/upgm_rq_bench.cpp (renamed from benchmarks/upgm_rq_bench.cpp) | 0 | ||||
| -rw-r--r-- | benchmarks/old-bench/vptree_knn_bench.cpp (renamed from benchmarks/vptree_knn_bench.cpp) | 0 | ||||
| -rw-r--r-- | benchmarks/query_workload_bench.cpp | 170 | ||||
| -rw-r--r-- | benchmarks/reconstruction_interference.cpp | 124 | ||||
| -rw-r--r-- | benchmarks/static_dynamic_comp.cpp | 117 | ||||
| -rw-r--r-- | benchmarks/watermark_testing.cpp | 55 |
28 files changed, 1271 insertions, 23 deletions
diff --git a/benchmarks/btree_insert_query_tput.cpp b/benchmarks/btree_insert_query_tput.cpp new file mode 100644 index 0000000..f838f80 --- /dev/null +++ b/benchmarks/btree_insert_query_tput.cpp @@ -0,0 +1,120 @@ +/* + * + */ + +#define ENABLE_TIMER + +#include <thread> + +#include "query/irs.h" +#include "include/data-proc.h" +#include "psu-ds/BTree.h" +#include <mutex> + +#include <gsl/gsl_rng.h> + +#include "psu-util/timer.h" + + +typedef de::Record<int64_t, int64_t> Rec; +typedef de::irs::Parms<Rec> QP; + +std::atomic<bool> inserts_done = false; + +std::mutex g_btree_lock; + +void query_thread(BenchBTree *tree, std::vector<QP> *queries) { + gsl_rng *rng = gsl_rng_alloc(gsl_rng_mt19937); + size_t total = 0; + + while (!inserts_done.load()) { + auto q_idx = gsl_rng_uniform_int(rng, queries->size()); + + auto q = (*queries)[q_idx]; + + std::vector<int64_t> result; + g_btree_lock.lock(); + tree->range_sample(q.lower_bound, q.upper_bound, 1000, result, rng); + g_btree_lock.unlock(); + + total += result.size(); + usleep(1); + } + + fprintf(stderr, "%ld\n", total); + + gsl_rng_free(rng); +} + +void insert_thread(BenchBTree *tree, size_t start, std::vector<int64_t> *records) { + size_t reccnt = 0; + for (size_t i=start; i<records->size(); i++) { + btree_record r; + r.key = (*records)[i]; + r.value = i; + + g_btree_lock.lock(); + tree->insert(r); + g_btree_lock.unlock(); + + if (i % 100000 == 0) { + fprintf(stderr, "Inserted %ld records\n", i); + } + } + + inserts_done.store(true); +} + +int main(int argc, char **argv) { + + if (argc < 5) { + fprintf(stderr, "btree_insert_query_tput reccnt query_threads datafile queryfile\n"); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + size_t qthread_cnt = atol(argv[2]); + std::string d_fname = std::string(argv[3]); + std::string q_fname = std::string(argv[4]); + + auto tree = new BenchBTree(); + gsl_rng * rng = gsl_rng_alloc(gsl_rng_mt19937); + + auto data = read_sosd_file(d_fname, n); + auto queries = read_range_queries<QP>(q_fname, .001); + + /* warmup structure w/ 10% of records */ + size_t warmup = .1 * n; + for (size_t i=0; i<warmup; i++) { + btree_record r; + r.key = data[i]; + r.value = i; + + tree->insert(r); + } + + TIMER_INIT(); + + std::vector<std::thread> qthreads(qthread_cnt); + + TIMER_START(); + std::thread i_thrd(insert_thread, tree, warmup, &data); + for (size_t i=0; i<qthread_cnt; i++) { + qthreads[i] = std::thread(query_thread, tree, &queries); + } + i_thrd.join(); + TIMER_STOP(); + + for (size_t i=0; i<qthread_cnt; i++) { + qthreads[i].join(); + } + + auto total_latency = TIMER_RESULT(); + size_t throughput = (size_t) ((double) (n - warmup) / (double) total_latency * 1e9); + fprintf(stdout, "T\t%ld\t%ld\n", total_latency, throughput); + + gsl_rng_free(rng); + delete tree; + fflush(stderr); +} + diff --git a/benchmarks/include/btree-util.h b/benchmarks/include/btree-util.h new file mode 100644 index 0000000..571c073 --- /dev/null +++ b/benchmarks/include/btree-util.h @@ -0,0 +1,27 @@ +#pragma once + +#include <cstdlib> +#include "psu-ds/BTree.h" + +struct btree_record { + int64_t key; + int64_t value; + + inline bool operator<(const btree_record& other) const { + return key < other.key || (key == other.key && value < other.value); + } + + inline bool operator==(const btree_record& other) const { + return key == other.key && value == other.value; + } +}; + +struct btree_key_extract { + static const int64_t &get(const btree_record &v) { + return v.key; + } +}; + +typedef psudb::BTree<int64_t, btree_record, btree_key_extract> BenchBTree; + + diff --git a/benchmarks/include/data-proc.h b/benchmarks/include/data-proc.h new file mode 100644 index 0000000..444cb94 --- /dev/null +++ b/benchmarks/include/data-proc.h @@ -0,0 +1,258 @@ +#include <cstdlib> +#include <cstdio> +#include <iostream> +#include <fstream> +#include <sstream> +#include <string> +#include <gsl/gsl_rng.h> +#include <cstring> +#include <vector> + +#include "psu-ds/BTree.h" + +#pragma once + +typedef int64_t key_type; +typedef int64_t value_type; +typedef uint64_t weight_type; + +static gsl_rng *g_rng; +static bool g_osm_data; + +struct btree_record { + key_type key; + value_type value; + + inline bool operator<(const btree_record& other) const { + return key < other.key || (key == other.key && value < other.value); + } + + inline bool operator==(const btree_record& other) const { + return key == other.key && value == other.value; + } +}; + +struct btree_key_extract { + static const key_type &get(const btree_record &v) { + return v.key; + } +}; + +typedef psudb::BTree<int64_t, btree_record, btree_key_extract> BenchBTree; + +static key_type g_min_key = UINT64_MAX; +static key_type g_max_key = 0; + +static size_t g_max_record_cnt = 0; +static size_t g_reccnt = 0; + +static constexpr unsigned int DEFAULT_SEED = 0; + +static unsigned int get_random_seed() +{ + unsigned int seed = 0; + std::fstream urandom; + urandom.open("/dev/urandom", std::ios::in|std::ios::binary); + urandom.read((char *) &seed, sizeof(seed)); + urandom.close(); + + return seed; +} + +static key_type osm_to_key(const char *key_field) { + double tmp_key = (atof(key_field) + 180) * 10e6; + return (key_type) tmp_key; +} + +static void init_bench_rng(unsigned int seed, const gsl_rng_type *type) +{ + g_rng = gsl_rng_alloc(type); + gsl_rng_set(g_rng, seed); +} + +static void init_bench_env(size_t max_reccnt, bool random_seed, bool osm_correction=true) +{ + unsigned int seed = (random_seed) ? get_random_seed() : DEFAULT_SEED; + init_bench_rng(seed, gsl_rng_mt19937); + g_osm_data = osm_correction; + g_max_record_cnt = max_reccnt; + g_reccnt = 0; +} + +static void delete_bench_env() +{ + gsl_rng_free(g_rng); +} + + +template <typename QP> +static std::vector<QP> read_lookup_queries(std::string fname, double selectivity) { + std::vector<QP> queries; + + FILE *qf = fopen(fname.c_str(), "r"); + size_t start, stop; + double sel; + while (fscanf(qf, "%zu%zu%lf\n", &start, &stop, &sel) != EOF) { + if (start < stop && std::abs(sel - selectivity) < 0.1) { + QP q; + q.target_key = start; + queries.push_back(q); + } + } + fclose(qf); + + return queries; +} + +template <typename QP> +static std::vector<QP> read_range_queries(std::string &fname, double selectivity) { + std::vector<QP> queries; + + FILE *qf = fopen(fname.c_str(), "r"); + size_t start, stop; + double sel; + while (fscanf(qf, "%zu%zu%lf\n", &start, &stop, &sel) != EOF) { + if (start < stop && std::abs(sel - selectivity) < 0.1) { + QP q; + q.lower_bound = start; + q.upper_bound = stop; + queries.push_back(q); + } + } + fclose(qf); + + return queries; +} + +template <typename QP> +static std::vector<QP> read_knn_queries(std::string fname, size_t k) { + std::vector<QP> queries; + + FILE *qf = fopen(fname.c_str(), "r"); + char *line = NULL; + size_t len = 0; + + while (getline(&line, &len, qf) > 0) { + char *token; + QP query; + size_t idx = 0; + + token = strtok(line, " "); + do { + query.point.data[idx++] = atof(token); + } while ((token = strtok(NULL, " "))); + + query.k = k; + queries.emplace_back(query); + } + + free(line); + fclose(qf); + + return queries; +} + +/* + * NOTE: The QP type must have lower_bound and upper_bound attributes, which + * this function will initialize. Any other query parameter attributes must + * be manually initialized after the call. + */ +template <typename R> +static bool next_vector_record(std::fstream &file, R &record, bool binary=false) { + std::string line; + if (std::getline(file, line, '\n')) { + std::stringstream line_stream(line); + for (size_t i=0; i<300; i++) { + std::string dimension; + + std::getline(line_stream, dimension, ' '); + record.data[i] = atof(dimension.c_str()); + } + + g_reccnt++; + + return true; + } + + return false; + +} + +template <typename R> +static bool next_record(std::fstream &file, R &record, bool binary=false) +{ + static value_type value = 1; + if (g_reccnt >= g_max_record_cnt) return false; + + if (binary) { + if (file.good()) { + decltype(R::key) key; + + file.read((char*) &key, sizeof(key)); + record.key = key; + record.value = value; + value++; + + if (record.key < g_min_key) g_min_key = record.key; + if (record.key > g_max_key) g_max_key = record.key; + + return true; + } + + return false; + } + + std::string line; + if (std::getline(file, line, '\n')) { + std::stringstream line_stream(line); + std::string key_field; + std::string value_field; + std::string weight_field; + + std::getline(line_stream, value_field, '\t'); + std::getline(line_stream, key_field, '\t'); + std::getline(line_stream, weight_field, '\t'); + + record.key = (g_osm_data) ? osm_to_key(key_field.c_str()) : atol(key_field.c_str()); + record.value = atol(value_field.c_str()); + + if (record.key < g_min_key) g_min_key = record.key; + if (record.key > g_max_key) g_max_key = record.key; + + g_reccnt++; + + return true; + } + + return false; +} + +template <typename R> +static bool build_delete_vec(std::vector<R> &to_delete, std::vector<R> &vec, size_t n) { + vec.clear(); + + size_t cnt = 0; + while (cnt < n) { + if (to_delete.size() == 0) { + return false; + } + + auto i = gsl_rng_uniform_int(g_rng, to_delete.size()); + vec.emplace_back(to_delete[i]); + to_delete.erase(to_delete.begin() + i); + } +td: + return true; +} + +static std::vector<int64_t> read_sosd_file(std::string &fname, size_t n) { + std::fstream file; + file.open(fname, std::ios::in | std::ios::binary); + + std::vector<int64_t> records(n); + for (size_t i=0; i<n; i++) { + file.read((char*) &(records[i]), sizeof(int64_t)); + } + + return records; +} diff --git a/benchmarks/insert_query_tput.cpp b/benchmarks/insert_query_tput.cpp new file mode 100644 index 0000000..ce05264 --- /dev/null +++ b/benchmarks/insert_query_tput.cpp @@ -0,0 +1,121 @@ +/* + * + */ + +#define ENABLE_TIMER + +#include <thread> + +#include "framework/DynamicExtension.h" +#include "shard/ISAMTree.h" +#include "query/irs.h" +#include "framework/interface/Record.h" +#include "include/data-proc.h" + +#include <gsl/gsl_rng.h> + +#include "psu-util/timer.h" + + +typedef de::Record<int64_t, int64_t> Rec; +typedef de::ISAMTree<Rec> ISAM; +typedef de::irs::Query<Rec, ISAM> Q; +typedef de::DynamicExtension<Rec, ISAM, Q> Ext; +typedef de::irs::Parms<Rec> QP; + +std::atomic<bool> inserts_done = false; + +void query_thread(Ext *extension, std::vector<QP> *queries) { + gsl_rng *rng = gsl_rng_alloc(gsl_rng_mt19937); + size_t total = 0; + + while (!inserts_done.load()) { + auto q_idx = gsl_rng_uniform_int(rng, queries->size()); + + auto q = (*queries)[q_idx]; + q.rng = rng; + q.sample_size = 1000; + + auto res = extension->query(&q); + auto r = res.get(); + total += r.size(); + usleep(1); + } + + fprintf(stderr, "%ld\n", total); + + gsl_rng_free(rng); +} + +void insert_thread(Ext *extension, size_t start, std::vector<int64_t> *records) { + size_t reccnt = 0; + Rec r; + for (size_t i=start; i<records->size(); i++) { + r.key = (*records)[i]; + r.value = i; + + while (!extension->insert(r)) { + usleep(1); + } + } + + inserts_done.store(true); +} + +int main(int argc, char **argv) { + + if (argc < 5) { + fprintf(stderr, "insert_query_tput reccnt query_threads datafile queryfile\n"); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + size_t qthread_cnt = atol(argv[2]); + std::string d_fname = std::string(argv[3]); + std::string q_fname = std::string(argv[4]); + + auto extension = new Ext(1000, 12000, 8, 0, 64); + gsl_rng * rng = gsl_rng_alloc(gsl_rng_mt19937); + + auto data = read_sosd_file(d_fname, n); + auto queries = read_range_queries<QP>(q_fname, .001); + + /* warmup structure w/ 10% of records */ + size_t warmup = .1 * n; + Rec r; + for (size_t i=0; i<warmup; i++) { + r.key = data[i]; + r.value = gsl_rng_uniform_int(rng, n); + + while (!extension->insert(r)) { + usleep(1); + } + } + + extension->await_next_epoch(); + + TIMER_INIT(); + + std::vector<std::thread> qthreads(qthread_cnt); + + TIMER_START(); + std::thread i_thrd(insert_thread, extension, warmup, &data); + for (size_t i=0; i<qthread_cnt; i++) { + qthreads[i] = std::thread(query_thread, extension, &queries); + } + i_thrd.join(); + TIMER_STOP(); + + for (size_t i=0; i<qthread_cnt; i++) { + qthreads[i].join(); + } + + auto total_latency = TIMER_RESULT(); + size_t throughput = (size_t) ((double) (n - warmup) / (double) total_latency * 1e9); + fprintf(stdout, "T\t%ld\t%ld\n", total_latency, throughput); + + gsl_rng_free(rng); + delete extension; + fflush(stderr); +} + diff --git a/benchmarks/insert_tail_latency.cpp b/benchmarks/insert_tail_latency.cpp new file mode 100644 index 0000000..bdc4536 --- /dev/null +++ b/benchmarks/insert_tail_latency.cpp @@ -0,0 +1,81 @@ +/* + * + */ + +#define ENABLE_TIMER + +#include <thread> + +#include "framework/DynamicExtension.h" +#include "shard/ISAMTree.h" +#include "query/rangecount.h" +#include "framework/interface/Record.h" +#include <unistd.h> +#include <gsl/gsl_rng.h> + +#include "psu-util/timer.h" + + +typedef de::Record<int64_t, int64_t> Rec; +typedef de::ISAMTree<Rec> ISAM; +typedef de::rc::Query<Rec, ISAM> Q; +typedef de::DynamicExtension<Rec, ISAM, Q, de::LayoutPolicy::TEIRING, de::DeletePolicy::TAGGING, de::SerialScheduler> Ext; + +std::atomic<size_t> total_latency = 0; + +void insert_thread(Ext *extension, size_t n, size_t k, size_t rate) { + int64_t delay = (1.0 / (double) rate) * 10e6; // delay in us + TIMER_INIT(); + for (int64_t i=0; i<n; i+=k) { + TIMER_START(); + for (int64_t j=0; j<k; j++) { + Rec r = {i+j, i+j}; + while (!extension->insert(r)) { + _mm_pause(); + } + + //usleep(delay); + /* + for (size_t i=0; i<10000; i++) { + __asm__ __volatile__ ("":::"memory"); + } + */ + } + TIMER_STOP(); + + auto insert_lat = TIMER_RESULT(); + + total_latency.fetch_add(insert_lat); + fprintf(stdout, "I\t%ld\t%ld\t%ld\n", i+k, insert_lat, k); + } +} + +int main(int argc, char **argv) { + + /* the closeout routine takes _forever_ ... so we'll just leak the memory */ + auto extension = new Ext(12000, 12001, 3); + size_t n = 10000000; + size_t per_trial = 1000; + double selectivity = .001; + size_t rate = 1000000; + + total_latency.store(0); + + gsl_rng * rng = gsl_rng_alloc(gsl_rng_mt19937); + + std::thread i_thrd1(insert_thread, extension, n, per_trial, rate); + //std::thread i_thrd2(insert_thread, extension, n/2, per_trial, rate); + + + i_thrd1.join(); + //i_thrd2.join(); + + auto avg_latency = total_latency.load() / n; + auto throughput = (int64_t) ((double) n / (double) total_latency * 1e9); + + fprintf(stdout, "AVG LAT: %ld\nThroughput: %ld\n", avg_latency, throughput); + + gsl_rng_free(rng); + fflush(stderr); +} + diff --git a/benchmarks/insertion_tput.cpp b/benchmarks/insertion_tput.cpp new file mode 100644 index 0000000..b4428f6 --- /dev/null +++ b/benchmarks/insertion_tput.cpp @@ -0,0 +1,72 @@ +/* + * + */ + +#define ENABLE_TIMER + +#include "framework/DynamicExtension.h" +#include "shard/ISAMTree.h" +#include "query/rangequery.h" +#include "framework/interface/Record.h" + +#include "psu-util/timer.h" + + +typedef de::Record<int64_t, int64_t> Rec; +typedef de::ISAMTree<Rec> ISAM; +typedef de::rq::Query<Rec, ISAM> Q; +typedef de::DynamicExtension<Rec, ISAM, Q> Ext; + + +void insert_thread(int64_t start, int64_t end, Ext *extension) { + for (int64_t i=start; i<end; i++) { + Rec r = {i, i}; + while (!extension->insert(r)) { + _mm_pause(); + } + } +} + + +int main(int argc, char **argv) { + + + size_t n = 1000000000; + + std::vector<int> counts = {1, 2, 4, 8}; //, 16, 32, 64}; + + + for (auto thread_count : counts) { + + auto extension = new Ext(1000, 12000, 8); + + size_t per_thread = n / thread_count; + + std::thread threads[thread_count]; + + TIMER_INIT(); + TIMER_START(); + for (size_t i=0; i<thread_count; i++) { + threads[i] = std::thread(insert_thread, i*per_thread, + i*per_thread+per_thread, extension); + } + + for (size_t i=0; i<thread_count; i++) { + threads[i].join(); + } + + TIMER_STOP(); + + auto total_time = TIMER_RESULT(); + + double tput = (double) n / (double) total_time * 1e9; + + fprintf(stdout, "%ld\t%d\t%lf\n", extension->get_record_count(), + thread_count, tput); + + delete extension; + } + + fflush(stderr); +} + diff --git a/benchmarks/irs_bench.cpp b/benchmarks/irs_bench.cpp new file mode 100644 index 0000000..ddb4220 --- /dev/null +++ b/benchmarks/irs_bench.cpp @@ -0,0 +1,125 @@ +/* + * + */ + +#define ENABLE_TIMER + +#include <thread> + +#include "framework/DynamicExtension.h" +#include "shard/ISAMTree.h" +#include "query/irs.h" +#include "framework/interface/Record.h" +#include "include/data-proc.h" + +#include <gsl/gsl_rng.h> + +#include "psu-util/timer.h" + + +typedef de::Record<int64_t, int64_t> Rec; +typedef de::ISAMTree<Rec> ISAM; +typedef de::irs::Query<Rec, ISAM> Q; +typedef de::DynamicExtension<Rec, ISAM, Q, de::LayoutPolicy::TEIRING, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Ext; +typedef de::irs::Parms<Rec> QP; + +void run_queries(Ext *extension, std::vector<QP> &queries, gsl_rng *rng) { + size_t total; + for (size_t i=0; i<queries.size(); i++) { + auto q = &queries[i]; + q->rng = rng; + q->sample_size = 1000; + + auto res = extension->query(q); + auto r = res.get(); + total += r.size(); + } + + fprintf(stderr, "%ld\n", total); +} + +size_t g_deleted_records = 0; +double delete_proportion = 0.05; + +void insert_records(Ext *extension, size_t start, + size_t stop, + std::vector<int64_t> &records, + std::vector<size_t> &to_delete, + size_t &delete_idx, + bool delete_records, + gsl_rng *rng) { + size_t reccnt = 0; + Rec r; + for (size_t i=start; i<stop; i++) { + r.key = records[i]; + r.value = i; + + while (!extension->insert(r)) { + usleep(1); + } + + if (delete_records && gsl_rng_uniform(rng) <= delete_proportion && to_delete[delete_idx] <= i) { + r.key = records[to_delete[delete_idx]]; + r.value = (int64_t) (to_delete[delete_idx]); + while (!extension->erase(r)) { + usleep(1); + } + delete_idx++; + g_deleted_records++; + } + } +} + +int main(int argc, char **argv) { + + if (argc < 4) { + fprintf(stderr, "insert_query_tput reccnt datafile queryfile\n"); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + std::string d_fname = std::string(argv[2]); + std::string q_fname = std::string(argv[3]); + + auto extension = new Ext(12000, 12001, 8, 0, 64); + gsl_rng * rng = gsl_rng_alloc(gsl_rng_mt19937); + + auto data = read_sosd_file(d_fname, n); + std::vector<size_t> to_delete(n * delete_proportion); + size_t j=0; + for (size_t i=0; i<data.size() && j<to_delete.size(); i++) { + if (gsl_rng_uniform(rng) <= delete_proportion) { + to_delete[j++] = i; + } + } + auto queries = read_range_queries<QP>(q_fname, .001); + + /* warmup structure w/ 10% of records */ + size_t warmup = .3 * n; + size_t delete_idx = 0; + insert_records(extension, 0, warmup, data, to_delete, delete_idx, false, rng); + + extension->await_next_epoch(); + + TIMER_INIT(); + + TIMER_START(); + insert_records(extension, warmup, data.size(), data, to_delete, delete_idx, true, rng); + TIMER_STOP(); + + auto insert_latency = TIMER_RESULT(); + size_t insert_throughput = (size_t) ((double) (n - warmup) / (double) insert_latency * 1e9); + + TIMER_START(); + run_queries(extension, queries, rng); + TIMER_STOP(); + + auto query_latency = TIMER_RESULT() / queries.size(); + + fprintf(stdout, "T\t%ld\t%ld\t%ld\n", insert_throughput, query_latency, g_deleted_records); + + gsl_rng_free(rng); + delete extension; + fflush(stderr); +} + diff --git a/benchmarks/alex_rq_bench.cpp b/benchmarks/old-bench/alex_rq_bench.cpp index f75afa6..f75afa6 100644 --- a/benchmarks/alex_rq_bench.cpp +++ b/benchmarks/old-bench/alex_rq_bench.cpp diff --git a/benchmarks/alias_wss_bench.cpp b/benchmarks/old-bench/alias_wss_bench.cpp index a3a43f2..a3a43f2 100644 --- a/benchmarks/alias_wss_bench.cpp +++ b/benchmarks/old-bench/alias_wss_bench.cpp diff --git a/benchmarks/btree_irs_bench.cpp b/benchmarks/old-bench/btree_irs_bench.cpp index 862fc6b..862fc6b 100644 --- a/benchmarks/btree_irs_bench.cpp +++ b/benchmarks/old-bench/btree_irs_bench.cpp diff --git a/benchmarks/btree_rq_bench.cpp b/benchmarks/old-bench/btree_rq_bench.cpp index d92b45d..d92b45d 100644 --- a/benchmarks/btree_rq_bench.cpp +++ b/benchmarks/old-bench/btree_rq_bench.cpp diff --git a/benchmarks/include/bench.h b/benchmarks/old-bench/include/bench.h index 586ff12..586ff12 100644 --- a/benchmarks/include/bench.h +++ b/benchmarks/old-bench/include/bench.h diff --git a/benchmarks/include/bench_utility.h b/benchmarks/old-bench/include/bench_utility.h index e33b93d..e33b93d 100644 --- a/benchmarks/include/bench_utility.h +++ b/benchmarks/old-bench/include/bench_utility.h diff --git a/benchmarks/include/standalone_utility.h b/benchmarks/old-bench/include/standalone_utility.h index 9876e84..727daa5 100644 --- a/benchmarks/include/standalone_utility.h +++ b/benchmarks/old-bench/include/standalone_utility.h @@ -1,18 +1,12 @@ #include <cstdlib> #include <cstdio> -#include <chrono> -#include <algorithm> -#include <numeric> -#include <memory> #include <iostream> #include <fstream> #include <sstream> -#include <unordered_set> -#include <set> #include <string> -#include <random> #include <gsl/gsl_rng.h> #include <cstring> +#include <vector> typedef uint64_t key_type; typedef uint64_t value_type; @@ -244,19 +238,3 @@ static bool build_delete_vec(std::vector<R> &to_delete, std::vector<R> &vec, siz td: return true; } - -/* - * helper routines for displaying progress bars to stderr - */ -static const char *g_prog_bar = "======================================================================"; -static const size_t g_prog_width = 50; - -static void progress_update(double percentage, std::string prompt) { - int val = (int) (percentage * 100); - int lpad = (int) (percentage * g_prog_width); - int rpad = (int) (g_prog_width - lpad); - fprintf(stderr, "\r(%3d%%) %20s [%.*s%*s]", val, prompt.c_str(), lpad, g_prog_bar, rpad, ""); - fflush(stderr); - - if (percentage >= 1) fprintf(stderr, "\n"); -} diff --git a/benchmarks/isam_irs_bench.cpp b/benchmarks/old-bench/isam_irs_bench.cpp index 96525f0..96525f0 100644 --- a/benchmarks/isam_irs_bench.cpp +++ b/benchmarks/old-bench/isam_irs_bench.cpp diff --git a/benchmarks/isam_rq_bench.cpp b/benchmarks/old-bench/isam_rq_bench.cpp index bb5626e..bb5626e 100644 --- a/benchmarks/isam_rq_bench.cpp +++ b/benchmarks/old-bench/isam_rq_bench.cpp diff --git a/benchmarks/mtree_knn_bench.cpp b/benchmarks/old-bench/mtree_knn_bench.cpp index 9d4cc57..9d4cc57 100644 --- a/benchmarks/mtree_knn_bench.cpp +++ b/benchmarks/old-bench/mtree_knn_bench.cpp diff --git a/benchmarks/pgm_pl_bench.cpp b/benchmarks/old-bench/pgm_pl_bench.cpp index f798861..f798861 100644 --- a/benchmarks/pgm_pl_bench.cpp +++ b/benchmarks/old-bench/pgm_pl_bench.cpp diff --git a/benchmarks/pgm_rq_bench.cpp b/benchmarks/old-bench/pgm_rq_bench.cpp index e25d29f..e25d29f 100644 --- a/benchmarks/pgm_rq_bench.cpp +++ b/benchmarks/old-bench/pgm_rq_bench.cpp diff --git a/benchmarks/test.cpp b/benchmarks/old-bench/test.cpp index 75bffe3..75bffe3 100644 --- a/benchmarks/test.cpp +++ b/benchmarks/old-bench/test.cpp diff --git a/benchmarks/triespline_rq_bench.cpp b/benchmarks/old-bench/triespline_rq_bench.cpp index 967c3b0..967c3b0 100644 --- a/benchmarks/triespline_rq_bench.cpp +++ b/benchmarks/old-bench/triespline_rq_bench.cpp diff --git a/benchmarks/upgm_pl_bench.cpp b/benchmarks/old-bench/upgm_pl_bench.cpp index e0445b2..e0445b2 100644 --- a/benchmarks/upgm_pl_bench.cpp +++ b/benchmarks/old-bench/upgm_pl_bench.cpp diff --git a/benchmarks/upgm_rq_bench.cpp b/benchmarks/old-bench/upgm_rq_bench.cpp index 940a9e6..940a9e6 100644 --- a/benchmarks/upgm_rq_bench.cpp +++ b/benchmarks/old-bench/upgm_rq_bench.cpp diff --git a/benchmarks/vptree_knn_bench.cpp b/benchmarks/old-bench/vptree_knn_bench.cpp index d8247e4..d8247e4 100644 --- a/benchmarks/vptree_knn_bench.cpp +++ b/benchmarks/old-bench/vptree_knn_bench.cpp diff --git a/benchmarks/query_workload_bench.cpp b/benchmarks/query_workload_bench.cpp new file mode 100644 index 0000000..d79daf2 --- /dev/null +++ b/benchmarks/query_workload_bench.cpp @@ -0,0 +1,170 @@ +/* + * + */ + +#define ENABLE_TIMER + +#include <thread> + +#include "framework/DynamicExtension.h" +#include "shard/ISAMTree.h" +#include "query/rangecount.h" +#include "framework/interface/Record.h" + +#include <gsl/gsl_rng.h> + +#include "psu-util/timer.h" + + +typedef de::Record<int64_t, int64_t> Rec; +typedef de::ISAMTree<Rec> ISAM; +typedef de::rc::Query<Rec, ISAM> Q; +typedef de::DynamicExtension<Rec, ISAM, Q> Ext; + +size_t g_insert_size = 50000; +size_t g_insert_frequency = 1000; +size_t g_query_count = 5000; + +void query_thread(Ext *extension, gsl_rng *rng, size_t n, bool parallel=true) { + TIMER_INIT(); + double selectivity = .001; + size_t k = 100; + size_t range = n * selectivity; + + size_t total_result = 0; + + auto q = new de::rc::Parms<Rec>(); + + std::vector<std::future<std::vector<Rec>>> results(k); + + TIMER_START(); + for (int64_t i=0; i<k; i++) { + size_t start = gsl_rng_uniform_int(rng, n - range); + + q->lower_bound = start; + q->upper_bound = start + range; + results[i] = extension->query(q); + if (!parallel) { + auto x = results[i].get(); + total_result += x[0].key; + } + } + + if (parallel) { + for (size_t i=0; i<k; i++) { + auto x = results[i].get(); + total_result += x[0].key; + } + } + + TIMER_STOP(); + auto query_lat = TIMER_RESULT(); + fprintf(stdout, "Q\t%ld\t%ld\t%ld\n", extension->get_record_count(), query_lat, k); + fprintf(stderr, "Q Total: %ld\n", total_result); + delete q; +} + +void insert_thread(Ext *extension, size_t n) { + gsl_rng *rng = gsl_rng_alloc(gsl_rng_mt19937); + + TIMER_INIT(); + size_t k=1000; + + Rec r; + for (size_t i=0; i<g_insert_size; i+=k) { + TIMER_START(); + for (size_t j=0; j<k; j++) { + r.key = gsl_rng_uniform_int(rng, n); + r.value = gsl_rng_uniform_int(rng, n); + + while (!extension->insert(r)) { + _mm_pause(); + } + } + TIMER_STOP(); + + auto insert_lat = TIMER_RESULT(); + fprintf(stdout, "I\t%ld\t%ld\t%ld\n", extension->get_record_count(), insert_lat, k); + } + + gsl_rng_free(rng); +} + +void parallel_bench(Ext *extension, gsl_rng *rng, size_t n) { + TIMER_INIT(); + + TIMER_START(); + for (size_t i=0; i < g_query_count; i+=100) { + query_thread(extension, rng, n); + if (i % g_insert_frequency == 0) { + auto x = std::thread(insert_thread, extension, n); + x.detach(); + } + } + TIMER_STOP(); + + auto workload_duration = TIMER_RESULT(); + fprintf(stdout, "W\t%ld\n", workload_duration); +} + + +void serial_bench(Ext *extension, gsl_rng *rng, size_t n) { + TIMER_INIT(); + TIMER_START(); + for (size_t i=0; i < g_query_count; i+=100) { + query_thread(extension, rng, n, false); + if (i % g_insert_frequency == 0) { + auto x = std::thread(insert_thread, extension, n); + x.join(); + } + } + TIMER_STOP(); + + auto workload_duration = TIMER_RESULT(); + fprintf(stdout, "W\t%ld\n", workload_duration); +} + +int main(int argc, char **argv) { + + if (argc < 5) { + fprintf(stderr, "query_workload_bench reccnt lwm hwm parallel\n"); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + size_t lwm = atol(argv[2]); + size_t hwm = atol(argv[3]); + bool parallel = atoi(argv[4]); + + size_t scale_factor = 8; + + auto extension = new Ext(lwm, hwm, scale_factor); + size_t per_trial = 1000; + double selectivity = .001; + + gsl_rng * rng = gsl_rng_alloc(gsl_rng_mt19937); + + /* build initial structure */ + size_t reccnt = 0; + Rec r; + for (size_t i=0; i<n; i++) { + r.key = gsl_rng_uniform_int(rng, n); + r.value = gsl_rng_uniform_int(rng, n); + + while (!extension->insert(r)) { + _mm_pause(); + } + } + + if (parallel) { + parallel_bench(extension, rng, n); + } else { + serial_bench(extension, rng, n); + } + + gsl_rng_free(rng); + delete extension; + fflush(stderr); + fflush(stdout); +} + diff --git a/benchmarks/reconstruction_interference.cpp b/benchmarks/reconstruction_interference.cpp new file mode 100644 index 0000000..57eb923 --- /dev/null +++ b/benchmarks/reconstruction_interference.cpp @@ -0,0 +1,124 @@ +/* + * + */ + +#define ENABLE_TIMER + +#include <thread> + +#include "framework/DynamicExtension.h" +#include "shard/ISAMTree.h" +#include "query/rangecount.h" +#include "framework/interface/Record.h" + +#include "psu-util/timer.h" + + +typedef de::Record<int64_t, int64_t> Rec; +typedef de::ISAMTree<Rec> ISAM; +typedef de::rc::Query<Rec, ISAM> Q; +typedef de::DynamicExtension<Rec, ISAM, Q> Ext; + +volatile std::atomic<bool> queries_done; + +void query_thread(Ext *extension, double selectivity, size_t k) { + TIMER_INIT(); + + size_t reccnt = extension->get_record_count(); + size_t range = reccnt * selectivity; + + auto q = new de::rc::Parms<Rec>(); + + TIMER_START(); + for (int64_t i=0; i<k; i++) { + size_t start = rand() % (reccnt - range); + q->lower_bound = start; + q->upper_bound = start + range; + auto res = extension->query(q); + auto r = res.get(); + } + TIMER_STOP(); + auto query_lat = TIMER_RESULT(); + fprintf(stdout, "Q\t%ld\t%ld\t%ld\n", reccnt, query_lat, k); + delete q; +} + +Ext *build_structure(size_t n) { + auto extension = new Ext(1000, 10000, 2); + + size_t i=0; + Rec r; + do { + r.key = rand() % n; + r.value = i; + if (extension->insert(r)) { + i++; + } else { + _mm_pause(); + } + } while (i < n); + + extension->await_next_epoch(); + return extension; +} + +void query_benchmark(double selectivity, size_t k, Ext *extension, size_t query_thrd_cnt) { + TIMER_INIT(); + + std::vector<std::thread> thrds(query_thrd_cnt); + + TIMER_START(); + for (size_t i=0; i<query_thrd_cnt; i++) { + thrds[i] = std::thread(query_thread, extension, selectivity, k); + } + + for (size_t i=0; i<query_thrd_cnt; i++) { + thrds[i].join(); + } + TIMER_STOP(); + + auto query_lat = TIMER_RESULT(); + fprintf(stdout, "Q\t%ld\t%ld\t%ld\t%ld\n", extension->get_record_count(), query_lat, k, query_thrd_cnt); + + queries_done.store(true); +} + +int main(int argc, char **argv) { + + /* the closeout routine takes _forever_ ... so we'll just leak the memory */ + size_t n = 10000000; + + size_t per_trial = 1000; + double selectivity = .001; + + /* build initial structure */ + auto extension = build_structure(n); + + std::vector<size_t> thread_counts = {8, 16, 32, 64, 128}; + + for (auto &threads : thread_counts) { + /* benchmark queries w/o any interference from reconstructions */ + query_benchmark(selectivity, per_trial, extension, threads); + + fprintf(stderr, "Running interference test...\n"); + + queries_done.store(false); + /* trigger a worst-case reconstruction and benchmark the queries */ + + std::thread q_thrd(query_benchmark, selectivity, per_trial, extension, threads); + + while (!queries_done.load()) { + auto s = extension->create_static_structure(); + delete s; + } + + fprintf(stderr, "Construction complete\n"); + q_thrd.join(); + } + + extension->print_scheduler_statistics(); + delete extension; + + fflush(stderr); +} + diff --git a/benchmarks/static_dynamic_comp.cpp b/benchmarks/static_dynamic_comp.cpp new file mode 100644 index 0000000..5a89d88 --- /dev/null +++ b/benchmarks/static_dynamic_comp.cpp @@ -0,0 +1,117 @@ +/* + * + */ + +#define ENABLE_TIMER + +#include "framework/DynamicExtension.h" +#include "query/rangecount.h" +#include "shard/TrieSpline.h" +#include "shard/ISAMTree.h" + + +#include "framework/interface/Record.h" +#include "framework/interface/Query.h" +#include "include/data-proc.h" + +#include "psu-util/timer.h" + + +typedef de::Record<key_type, value_type> Rec; +typedef de::ISAMTree<Rec> ISAM; +typedef de::TrieSpline<Rec> TS; + +typedef de::rc::Query<Rec, ISAM> Q; +typedef de::DynamicExtension<Rec, ISAM, Q> Ext; + +typedef de::MutableBuffer<Rec> Buffer; + +typedef de::rc::Parms<Rec> query; + +Buffer *file_to_mbuffer(std::string &fname, size_t n) { + std::fstream file; + file.open(fname, std::ios::in); + + auto buff = new Buffer(n, n+1); + + Rec rec; + while (next_record(file, rec) && buff->get_record_count() < n) { + buff->append(rec); + } + + return buff; +} + +BenchBTree *file_to_btree(std::string &fname, size_t n) { + std::fstream file; + file.open(fname, std::ios::in); + + auto btree = new BenchBTree(); + Rec rec; + while (next_record(file, rec) && btree->size() < n) { + btree->insert({rec.key, rec.value}); + } + + return btree; +} + +template<de::ShardInterface S> +void benchmark_shard(S *shard, std::vector<query> &queries) { + TIMER_INIT(); + + TIMER_START(); + for (auto & q : queries) { + auto state = de::rc::Query<S, Rec>::get_query_state(shard, &q); + auto res = de::rc::Query<S, Rec>::query(shard, state, &q); + } + TIMER_STOP(); + + auto latency = TIMER_RESULT() / queries.size(); + fprintf(stdout, "%ld %ld\n", latency, shard->get_memory_usage() - shard->get_record_count() * sizeof(de::Wrapped<Rec>)); +} + +void benchmark_btree(BenchBTree *btree, std::vector<query> &queries) { + TIMER_INIT(); + + TIMER_START(); + for (auto & q : queries) { + size_t c = 0; + auto ptr = btree->find(q.lower_bound); + while(ptr != btree->end() && ptr->key <= q.upper_bound) { + c++; + } + } + TIMER_STOP(); + + auto latency = TIMER_RESULT() / queries.size(); + auto mem = btree->get_stats().inner_nodes * psudb::btree_default_traits<key_type, btree_record>::inner_slots * (sizeof(key_type) + sizeof(void*)); + fprintf(stdout, "%ld %ld\n", latency, mem); +} + +int main(int argc, char **argv) { + if (argc < 4) { + fprintf(stderr, "Usage: static_dynamic_comp <filename> <record_count> <query_file>\n"); + exit(EXIT_FAILURE); + } + + std::string d_fname = std::string(argv[1]); + size_t reccnt = atol(argv[2]); + std::string q_fname = std::string(argv[3]); + + init_bench_env(reccnt, true, false); + auto queries = read_range_queries<query>(q_fname, .001); + + auto buff = file_to_mbuffer(d_fname, reccnt); + + TS *ts = new TS(buff->get_buffer_view()); + benchmark_shard<TS>(ts, queries); + delete ts; + + ISAM *isam = new ISAM(buff->get_buffer_view()); + benchmark_shard<ISAM>(isam, queries); + delete isam; + + auto btree = file_to_btree(d_fname, reccnt); + +} + diff --git a/benchmarks/watermark_testing.cpp b/benchmarks/watermark_testing.cpp new file mode 100644 index 0000000..caba8ff --- /dev/null +++ b/benchmarks/watermark_testing.cpp @@ -0,0 +1,55 @@ +/* + * + */ + +#define ENABLE_TIMER + +#include "framework/DynamicExtension.h" +#include "shard/ISAMTree.h" +#include "query/rangequery.h" +#include "framework/interface/Record.h" + +#include "psu-util/timer.h" + + +typedef de::Record<int64_t, int64_t> Rec; +typedef de::ISAMTree<Rec> ISAM; +typedef de::rq::Query<Rec, ISAM> Q; +typedef de::DynamicExtension<Rec, ISAM, Q> Ext; + + + +int main(int argc, char **argv) { + std::vector hwms = {5000l, 10000l, 20000l, 50000l}; + std::vector lwms = {.1, .2, .3, .4, .5, .6, .7, .8, .9}; + + size_t n = 1000000; + + TIMER_INIT(); + + for (auto &hwm : hwms) { + for (size_t i=0; i<lwms.size(); i++) { + size_t lwm = hwm * lwms[i]; + + auto extension = new Ext(lwm, hwm, 8); + TIMER_START(); + for (int64_t i=0; i<n; i++) { + Rec r = {i, i}; + while (!extension->insert(r)) { + _mm_pause(); + } + } + TIMER_STOP(); + + auto insert_time = TIMER_RESULT(); + double insert_throughput = (double) n / (double) insert_time * 1e9; + + fprintf(stdout, "%ld\t%ld\t%lf\n", lwm, hwm, insert_throughput); + + extension->print_scheduler_statistics(); + + delete extension; + } + } +} + |