diff options
| author | Douglas B. Rumbaugh <dbr4@psu.edu> | 2024-02-09 14:06:59 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-02-09 14:06:59 -0500 |
| commit | bc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30 (patch) | |
| tree | 66333c55feb0ea8875a50e6dc07c8535d241bf1c /benchmarks/reconstruction_interference.cpp | |
| parent | 076e104b8672924c3d80cd1da2fdb5ebee1766ac (diff) | |
| parent | 46885246313358a3b606eca139b20280e96db10e (diff) | |
| download | dynamic-extension-bc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30.tar.gz | |
Merge pull request #1 from dbrumbaugh/new-buffer
Initial Concurrency Implementation
Diffstat (limited to 'benchmarks/reconstruction_interference.cpp')
| -rw-r--r-- | benchmarks/reconstruction_interference.cpp | 124 |
1 files changed, 124 insertions, 0 deletions
diff --git a/benchmarks/reconstruction_interference.cpp b/benchmarks/reconstruction_interference.cpp new file mode 100644 index 0000000..57eb923 --- /dev/null +++ b/benchmarks/reconstruction_interference.cpp @@ -0,0 +1,124 @@ +/* + * + */ + +#define ENABLE_TIMER + +#include <thread> + +#include "framework/DynamicExtension.h" +#include "shard/ISAMTree.h" +#include "query/rangecount.h" +#include "framework/interface/Record.h" + +#include "psu-util/timer.h" + + +typedef de::Record<int64_t, int64_t> Rec; +typedef de::ISAMTree<Rec> ISAM; +typedef de::rc::Query<Rec, ISAM> Q; +typedef de::DynamicExtension<Rec, ISAM, Q> Ext; + +volatile std::atomic<bool> queries_done; + +void query_thread(Ext *extension, double selectivity, size_t k) { + TIMER_INIT(); + + size_t reccnt = extension->get_record_count(); + size_t range = reccnt * selectivity; + + auto q = new de::rc::Parms<Rec>(); + + TIMER_START(); + for (int64_t i=0; i<k; i++) { + size_t start = rand() % (reccnt - range); + q->lower_bound = start; + q->upper_bound = start + range; + auto res = extension->query(q); + auto r = res.get(); + } + TIMER_STOP(); + auto query_lat = TIMER_RESULT(); + fprintf(stdout, "Q\t%ld\t%ld\t%ld\n", reccnt, query_lat, k); + delete q; +} + +Ext *build_structure(size_t n) { + auto extension = new Ext(1000, 10000, 2); + + size_t i=0; + Rec r; + do { + r.key = rand() % n; + r.value = i; + if (extension->insert(r)) { + i++; + } else { + _mm_pause(); + } + } while (i < n); + + extension->await_next_epoch(); + return extension; +} + +void query_benchmark(double selectivity, size_t k, Ext *extension, size_t query_thrd_cnt) { + TIMER_INIT(); + + std::vector<std::thread> thrds(query_thrd_cnt); + + TIMER_START(); + for (size_t i=0; i<query_thrd_cnt; i++) { + thrds[i] = std::thread(query_thread, extension, selectivity, k); + } + + for (size_t i=0; i<query_thrd_cnt; i++) { + thrds[i].join(); + } + TIMER_STOP(); + + auto query_lat = TIMER_RESULT(); + fprintf(stdout, "Q\t%ld\t%ld\t%ld\t%ld\n", extension->get_record_count(), query_lat, k, query_thrd_cnt); + + queries_done.store(true); +} + +int main(int argc, char **argv) { + + /* the closeout routine takes _forever_ ... so we'll just leak the memory */ + size_t n = 10000000; + + size_t per_trial = 1000; + double selectivity = .001; + + /* build initial structure */ + auto extension = build_structure(n); + + std::vector<size_t> thread_counts = {8, 16, 32, 64, 128}; + + for (auto &threads : thread_counts) { + /* benchmark queries w/o any interference from reconstructions */ + query_benchmark(selectivity, per_trial, extension, threads); + + fprintf(stderr, "Running interference test...\n"); + + queries_done.store(false); + /* trigger a worst-case reconstruction and benchmark the queries */ + + std::thread q_thrd(query_benchmark, selectivity, per_trial, extension, threads); + + while (!queries_done.load()) { + auto s = extension->create_static_structure(); + delete s; + } + + fprintf(stderr, "Construction complete\n"); + q_thrd.join(); + } + + extension->print_scheduler_statistics(); + delete extension; + + fflush(stderr); +} + |