diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-31 16:57:41 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-31 16:57:41 -0500 |
| commit | 1e226fc415d7674de0ecde51199d89e9042c6a22 (patch) | |
| tree | 8406e6e8c133aee9aa5c79c5b7120ce749320a14 | |
| parent | 1b354771dea44523183758e71ebc7623ace143f5 (diff) | |
| download | dynamic-extension-1e226fc415d7674de0ecde51199d89e9042c6a22.tar.gz | |
Updated insert query throughput to use IRS queries
| -rw-r--r-- | CMakeLists.txt | 12 | ||||
| -rw-r--r-- | benchmarks/insert_query_tput.cpp | 14 | ||||
| -rw-r--r-- | benchmarks/insert_query_tput_serial.cpp | 104 | ||||
| -rw-r--r-- | include/query/irs.h | 18 |
4 files changed, 25 insertions, 123 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 3f6ea4c..e2fb0ad 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(namespace "de") project("Practical Dynamic Extension" VERSION 0.1.0) -set(debug false) +set(debug true) set(tests True) set(bench true) set(old_bench False) @@ -130,16 +130,16 @@ if (bench) target_include_directories(insertion_tput PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include bench/include external/psudb-common/cpp/include) target_link_options(insertion_tput PUBLIC -mcx16) + add_executable(query_workload_bench ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/query_workload_bench.cpp) + target_link_libraries(query_workload_bench PUBLIC gsl pthread gomp atomic) + target_include_directories(query_workload_bench PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include bench/include external/psudb-common/cpp/include) + target_link_options(query_workload_bench PUBLIC -mcx16) + add_executable(insert_query_tput ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/insert_query_tput.cpp) target_link_libraries(insert_query_tput PUBLIC gsl pthread gomp atomic) target_include_directories(insert_query_tput PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include bench/include external/psudb-common/cpp/include) target_link_options(insert_query_tput PUBLIC -mcx16) - add_executable(insert_query_tput_serial ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/insert_query_tput_serial.cpp) - target_link_libraries(insert_query_tput_serial PUBLIC gsl pthread gomp atomic) - target_include_directories(insert_query_tput_serial PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include bench/include external/psudb-common/cpp/include) - target_link_options(insert_query_tput_serial PUBLIC -mcx16) - add_executable(watermark_testing ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/watermark_testing.cpp) target_link_libraries(watermark_testing PUBLIC gsl pthread gomp atomic) target_include_directories(watermark_testing PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include bench/include external/psudb-common/cpp/include) diff --git a/benchmarks/insert_query_tput.cpp b/benchmarks/insert_query_tput.cpp index 8274d2a..05715b1 100644 --- a/benchmarks/insert_query_tput.cpp +++ b/benchmarks/insert_query_tput.cpp @@ -8,7 +8,7 @@ #include "framework/DynamicExtension.h" #include "shard/ISAMTree.h" -#include "query/rangecount.h" +#include "query/irs.h" #include "framework/interface/Record.h" #include <gsl/gsl_rng.h> @@ -18,7 +18,7 @@ typedef de::Record<int64_t, int64_t> Rec; typedef de::ISAMTree<Rec> ISAM; -typedef de::rc::Query<ISAM, Rec> Q; +typedef de::irs::Query<ISAM, Rec> Q; typedef de::DynamicExtension<Rec, ISAM, Q> Ext; std::atomic<bool> inserts_done = false; @@ -27,17 +27,19 @@ void query_thread(Ext *extension, size_t n) { gsl_rng *rng = gsl_rng_alloc(gsl_rng_mt19937); size_t range = n*.0001; - size_t total = 0; + int64_t total = 0; - de::rc::Parms<Rec> *q = new de::rc::Parms<Rec>(); + de::irs::Parms<Rec> *q = new de::irs::Parms<Rec>(); while (!inserts_done.load()) { size_t start = gsl_rng_uniform_int(rng, n - range); q->lower_bound = start; q->upper_bound = start + range; + q->sample_size = 100; + q->rng = rng; auto res = extension->query(q); auto r = res.get(); - total += r[0].key; - usleep(1); + total += r.size(); + usleep(1); } fprintf(stderr, "%ld\n", total); diff --git a/benchmarks/insert_query_tput_serial.cpp b/benchmarks/insert_query_tput_serial.cpp deleted file mode 100644 index ff9dc40..0000000 --- a/benchmarks/insert_query_tput_serial.cpp +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - */ - -#define ENABLE_TIMER - -#include <thread> - -#include "framework/DynamicExtension.h" -#include "shard/ISAMTree.h" -#include "query/rangecount.h" -#include "framework/interface/Record.h" - -#include <gsl/gsl_rng.h> - -#include "psu-util/timer.h" - - -typedef de::Record<int64_t, int64_t> Rec; -typedef de::ISAMTree<Rec> ISAM; -typedef de::rc::Query<ISAM, Rec> Q; -typedef de::DynamicExtension<Rec, ISAM, Q, de::LayoutPolicy::TEIRING, de::DeletePolicy::TAGGING, de::SerialScheduler> Ext; - -std::atomic<bool> inserts_done = false; - - -void query_thread(Ext *extension, double selectivity, size_t k, gsl_rng *rng) { - TIMER_INIT(); - - size_t reccnt = extension->get_record_count(); - - size_t range = reccnt * selectivity; - - auto q = new de::rc::Parms<Rec>(); - - TIMER_START(); - for (int64_t i=0; i<k; i++) { - size_t start = gsl_rng_uniform_int(rng, reccnt - range); - - q->lower_bound = start; - q->upper_bound = start + range; - auto res = extension->query(q); - auto r = res.get(); - } - TIMER_STOP(); - auto query_lat = TIMER_RESULT(); - fprintf(stdout, "Q\t%ld\t%ld\t%ld\n", reccnt, query_lat, k); - delete q; -} - -void insert_thread(Ext *extension, size_t n, size_t k, gsl_rng *rng) { - TIMER_INIT(); - - size_t reccnt = 0; - Rec r; - while (reccnt < n) { - auto old_reccnt = reccnt; - - TIMER_START(); - for (size_t i=0; i<k; i++) { - r.key = reccnt; - r.value = reccnt; - - if (extension->insert(r)) { - reccnt++; - } - } - TIMER_STOP(); - auto insert_lat = TIMER_RESULT(); - - fprintf(stdout, "I\t%ld\t%ld\t%ld\n", reccnt, insert_lat, reccnt - old_reccnt); - - if (reccnt % 100000 == 0 && reccnt != n) { - auto a = std::thread(query_thread, extension, .01, 20, rng); - a.join(); - } - } -} - -int main(int argc, char **argv) { - - /* the closeout routine takes _forever_ ... so we'll just leak the memory */ - auto extension = new Ext(1000, 12000, 8); - size_t n = 10000000; - size_t per_trial = 1000; - double selectivity = .001; - - TIMER_INIT(); - - gsl_rng * rng = gsl_rng_alloc(gsl_rng_mt19937); - - TIMER_START(); - std::thread i_thrd(insert_thread, extension, n, per_trial, rng); - i_thrd.join(); - TIMER_STOP(); - - auto total_latency = TIMER_RESULT(); - fprintf(stdout, "T\t%ld\n", total_latency); - - gsl_rng_free(rng); - delete extension; - fflush(stderr); -} - diff --git a/include/query/irs.h b/include/query/irs.h index fa69ea1..7ef5069 100644 --- a/include/query/irs.h +++ b/include/query/irs.h @@ -13,6 +13,7 @@ #pragma once #include "framework/QueryRequirements.h" +#include "psu-ds/Alias.h" namespace de { namespace irs { @@ -38,6 +39,9 @@ struct BufferState { size_t cutoff; std::vector<Wrapped<R>> records; size_t sample_size; + BufferView<R> buffer; + + BufferState(BufferView<R> buffer) : buffer(std::move(buffer)) {} }; template <ShardInterface S, RecordInterface R, bool Rejection=true> @@ -64,10 +68,10 @@ public: return res; } - static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) { - auto res = new BufferState<R>(); + static void* get_buffer_query_state(BufferView<R> buffer, void *parms) { + auto res = new BufferState<R>(std::move(buffer)); - res->cutoff = buffer->get_record_count(); + res->cutoff = res->buffer.get_record_count(); res->sample_size = 0; if constexpr (Rejection) { @@ -78,8 +82,8 @@ public: auto upper_key = ((Parms<R> *) parms)->upper_bound; for (size_t i=0; i<res->cutoff; i++) { - if (((buffer->get_data() + i)->rec.key >= lower_key) && ((buffer->get_data() + i)->rec.key <= upper_key)) { - res->records.emplace_back(*(buffer->get_data() + i)); + if ((res->buffer.get(i)->rec.key >= lower_key) && (buffer.get(i)->rec.key <= upper_key)) { + res->records.emplace_back(*(res->buffer.get(i))); } } @@ -167,7 +171,7 @@ public: return result_set; } - static std::vector<Wrapped<R>> buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) { + static std::vector<Wrapped<R>> buffer_query(void *state, void *parms) { auto st = (BufferState<R> *) state; auto p = (Parms<R> *) parms; @@ -177,7 +181,7 @@ public: if constexpr (Rejection) { for (size_t i=0; i<st->sample_size; i++) { auto idx = gsl_rng_uniform_int(p->rng, st->cutoff); - auto rec = buffer->get_data() + idx; + auto rec = st->buffer.get(idx); if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { result.emplace_back(*rec); |