summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-31 16:57:41 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-31 16:57:41 -0500
commit1e226fc415d7674de0ecde51199d89e9042c6a22 (patch)
tree8406e6e8c133aee9aa5c79c5b7120ce749320a14
parent1b354771dea44523183758e71ebc7623ace143f5 (diff)
downloaddynamic-extension-1e226fc415d7674de0ecde51199d89e9042c6a22.tar.gz
Updated insert query throughput to use IRS queries
-rw-r--r--CMakeLists.txt12
-rw-r--r--benchmarks/insert_query_tput.cpp14
-rw-r--r--benchmarks/insert_query_tput_serial.cpp104
-rw-r--r--include/query/irs.h18
4 files changed, 25 insertions, 123 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3f6ea4c..e2fb0ad 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
set(namespace "de")
project("Practical Dynamic Extension" VERSION 0.1.0)
-set(debug false)
+set(debug true)
set(tests True)
set(bench true)
set(old_bench False)
@@ -130,16 +130,16 @@ if (bench)
target_include_directories(insertion_tput PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include bench/include external/psudb-common/cpp/include)
target_link_options(insertion_tput PUBLIC -mcx16)
+ add_executable(query_workload_bench ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/query_workload_bench.cpp)
+ target_link_libraries(query_workload_bench PUBLIC gsl pthread gomp atomic)
+ target_include_directories(query_workload_bench PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include bench/include external/psudb-common/cpp/include)
+ target_link_options(query_workload_bench PUBLIC -mcx16)
+
add_executable(insert_query_tput ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/insert_query_tput.cpp)
target_link_libraries(insert_query_tput PUBLIC gsl pthread gomp atomic)
target_include_directories(insert_query_tput PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include bench/include external/psudb-common/cpp/include)
target_link_options(insert_query_tput PUBLIC -mcx16)
- add_executable(insert_query_tput_serial ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/insert_query_tput_serial.cpp)
- target_link_libraries(insert_query_tput_serial PUBLIC gsl pthread gomp atomic)
- target_include_directories(insert_query_tput_serial PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include bench/include external/psudb-common/cpp/include)
- target_link_options(insert_query_tput_serial PUBLIC -mcx16)
-
add_executable(watermark_testing ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/watermark_testing.cpp)
target_link_libraries(watermark_testing PUBLIC gsl pthread gomp atomic)
target_include_directories(watermark_testing PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include bench/include external/psudb-common/cpp/include)
diff --git a/benchmarks/insert_query_tput.cpp b/benchmarks/insert_query_tput.cpp
index 8274d2a..05715b1 100644
--- a/benchmarks/insert_query_tput.cpp
+++ b/benchmarks/insert_query_tput.cpp
@@ -8,7 +8,7 @@
#include "framework/DynamicExtension.h"
#include "shard/ISAMTree.h"
-#include "query/rangecount.h"
+#include "query/irs.h"
#include "framework/interface/Record.h"
#include <gsl/gsl_rng.h>
@@ -18,7 +18,7 @@
typedef de::Record<int64_t, int64_t> Rec;
typedef de::ISAMTree<Rec> ISAM;
-typedef de::rc::Query<ISAM, Rec> Q;
+typedef de::irs::Query<ISAM, Rec> Q;
typedef de::DynamicExtension<Rec, ISAM, Q> Ext;
std::atomic<bool> inserts_done = false;
@@ -27,17 +27,19 @@ void query_thread(Ext *extension, size_t n) {
gsl_rng *rng = gsl_rng_alloc(gsl_rng_mt19937);
size_t range = n*.0001;
- size_t total = 0;
+ int64_t total = 0;
- de::rc::Parms<Rec> *q = new de::rc::Parms<Rec>();
+ de::irs::Parms<Rec> *q = new de::irs::Parms<Rec>();
while (!inserts_done.load()) {
size_t start = gsl_rng_uniform_int(rng, n - range);
q->lower_bound = start;
q->upper_bound = start + range;
+ q->sample_size = 100;
+ q->rng = rng;
auto res = extension->query(q);
auto r = res.get();
- total += r[0].key;
- usleep(1);
+ total += r.size();
+ usleep(1);
}
fprintf(stderr, "%ld\n", total);
diff --git a/benchmarks/insert_query_tput_serial.cpp b/benchmarks/insert_query_tput_serial.cpp
deleted file mode 100644
index ff9dc40..0000000
--- a/benchmarks/insert_query_tput_serial.cpp
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- */
-
-#define ENABLE_TIMER
-
-#include <thread>
-
-#include "framework/DynamicExtension.h"
-#include "shard/ISAMTree.h"
-#include "query/rangecount.h"
-#include "framework/interface/Record.h"
-
-#include <gsl/gsl_rng.h>
-
-#include "psu-util/timer.h"
-
-
-typedef de::Record<int64_t, int64_t> Rec;
-typedef de::ISAMTree<Rec> ISAM;
-typedef de::rc::Query<ISAM, Rec> Q;
-typedef de::DynamicExtension<Rec, ISAM, Q, de::LayoutPolicy::TEIRING, de::DeletePolicy::TAGGING, de::SerialScheduler> Ext;
-
-std::atomic<bool> inserts_done = false;
-
-
-void query_thread(Ext *extension, double selectivity, size_t k, gsl_rng *rng) {
- TIMER_INIT();
-
- size_t reccnt = extension->get_record_count();
-
- size_t range = reccnt * selectivity;
-
- auto q = new de::rc::Parms<Rec>();
-
- TIMER_START();
- for (int64_t i=0; i<k; i++) {
- size_t start = gsl_rng_uniform_int(rng, reccnt - range);
-
- q->lower_bound = start;
- q->upper_bound = start + range;
- auto res = extension->query(q);
- auto r = res.get();
- }
- TIMER_STOP();
- auto query_lat = TIMER_RESULT();
- fprintf(stdout, "Q\t%ld\t%ld\t%ld\n", reccnt, query_lat, k);
- delete q;
-}
-
-void insert_thread(Ext *extension, size_t n, size_t k, gsl_rng *rng) {
- TIMER_INIT();
-
- size_t reccnt = 0;
- Rec r;
- while (reccnt < n) {
- auto old_reccnt = reccnt;
-
- TIMER_START();
- for (size_t i=0; i<k; i++) {
- r.key = reccnt;
- r.value = reccnt;
-
- if (extension->insert(r)) {
- reccnt++;
- }
- }
- TIMER_STOP();
- auto insert_lat = TIMER_RESULT();
-
- fprintf(stdout, "I\t%ld\t%ld\t%ld\n", reccnt, insert_lat, reccnt - old_reccnt);
-
- if (reccnt % 100000 == 0 && reccnt != n) {
- auto a = std::thread(query_thread, extension, .01, 20, rng);
- a.join();
- }
- }
-}
-
-int main(int argc, char **argv) {
-
- /* the closeout routine takes _forever_ ... so we'll just leak the memory */
- auto extension = new Ext(1000, 12000, 8);
- size_t n = 10000000;
- size_t per_trial = 1000;
- double selectivity = .001;
-
- TIMER_INIT();
-
- gsl_rng * rng = gsl_rng_alloc(gsl_rng_mt19937);
-
- TIMER_START();
- std::thread i_thrd(insert_thread, extension, n, per_trial, rng);
- i_thrd.join();
- TIMER_STOP();
-
- auto total_latency = TIMER_RESULT();
- fprintf(stdout, "T\t%ld\n", total_latency);
-
- gsl_rng_free(rng);
- delete extension;
- fflush(stderr);
-}
-
diff --git a/include/query/irs.h b/include/query/irs.h
index fa69ea1..7ef5069 100644
--- a/include/query/irs.h
+++ b/include/query/irs.h
@@ -13,6 +13,7 @@
#pragma once
#include "framework/QueryRequirements.h"
+#include "psu-ds/Alias.h"
namespace de { namespace irs {
@@ -38,6 +39,9 @@ struct BufferState {
size_t cutoff;
std::vector<Wrapped<R>> records;
size_t sample_size;
+ BufferView<R> buffer;
+
+ BufferState(BufferView<R> buffer) : buffer(std::move(buffer)) {}
};
template <ShardInterface S, RecordInterface R, bool Rejection=true>
@@ -64,10 +68,10 @@ public:
return res;
}
- static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) {
- auto res = new BufferState<R>();
+ static void* get_buffer_query_state(BufferView<R> buffer, void *parms) {
+ auto res = new BufferState<R>(std::move(buffer));
- res->cutoff = buffer->get_record_count();
+ res->cutoff = res->buffer.get_record_count();
res->sample_size = 0;
if constexpr (Rejection) {
@@ -78,8 +82,8 @@ public:
auto upper_key = ((Parms<R> *) parms)->upper_bound;
for (size_t i=0; i<res->cutoff; i++) {
- if (((buffer->get_data() + i)->rec.key >= lower_key) && ((buffer->get_data() + i)->rec.key <= upper_key)) {
- res->records.emplace_back(*(buffer->get_data() + i));
+ if ((res->buffer.get(i)->rec.key >= lower_key) && (buffer.get(i)->rec.key <= upper_key)) {
+ res->records.emplace_back(*(res->buffer.get(i)));
}
}
@@ -167,7 +171,7 @@ public:
return result_set;
}
- static std::vector<Wrapped<R>> buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) {
+ static std::vector<Wrapped<R>> buffer_query(void *state, void *parms) {
auto st = (BufferState<R> *) state;
auto p = (Parms<R> *) parms;
@@ -177,7 +181,7 @@ public:
if constexpr (Rejection) {
for (size_t i=0; i<st->sample_size; i++) {
auto idx = gsl_rng_uniform_int(p->rng, st->cutoff);
- auto rec = buffer->get_data() + idx;
+ auto rec = st->buffer.get(idx);
if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) {
result.emplace_back(*rec);