summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-02-24 11:14:52 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-02-24 11:14:52 -0500
commitd116b94389538aa8e0e7354fae77693b980de4f0 (patch)
tree918e29a6734c726f398f2d06894fdb42d9752fa9
parent40fe2e7ea56d49a065a4a53b7f8a4a918a5d78b0 (diff)
downloaddynamic-extension-d116b94389538aa8e0e7354fae77693b980de4f0.tar.gz
Query Preemption: still has one or two bugs, but mostly works
-rw-r--r--CMakeLists.txt6
-rw-r--r--benchmarks/tail-latency/insert_query_threads.cpp185
-rw-r--r--benchmarks/tail-latency/mixed_workload.cpp4
-rw-r--r--benchmarks/tail-latency/mixed_workload_average.cpp4
-rw-r--r--include/framework/DynamicExtension.h49
-rw-r--r--include/framework/scheduling/Version.h7
-rw-r--r--include/framework/structure/MutableBuffer.h6
-rw-r--r--include/framework/util/Configuration.h2
8 files changed, 246 insertions, 17 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3faa60b..0273b5a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -317,6 +317,12 @@ if (tail_bench)
target_link_libraries(mixed_workload_average PUBLIC gsl pthread atomic)
target_include_directories(mixed_workload_average PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include)
target_link_options(mixed_workload_average PUBLIC -mcx16)
+
+ add_executable(insert_query_threads ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/insert_query_threads.cpp)
+ target_link_libraries(insert_query_threads PUBLIC gsl pthread atomic)
+ target_include_directories(insert_query_threads PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include)
+ target_link_options(insert_query_threads PUBLIC -mcx16)
+
endif()
if (bench)
diff --git a/benchmarks/tail-latency/insert_query_threads.cpp b/benchmarks/tail-latency/insert_query_threads.cpp
new file mode 100644
index 0000000..1f35f3d
--- /dev/null
+++ b/benchmarks/tail-latency/insert_query_threads.cpp
@@ -0,0 +1,185 @@
+/*
+ *
+ */
+
+#define ENABLE_TIMER
+#define TS_TEST
+
+#include <thread>
+
+#include "framework/scheduling/SerialScheduler.h"
+#include "framework/util/Configuration.h"
+#include "util/types.h"
+#include "file_util.h"
+#include "framework/DynamicExtension.h"
+#include "framework/interface/Record.h"
+#include "framework/scheduling/FIFOScheduler.h"
+#include "query/rangecount.h"
+#include "shard/TrieSpline.h"
+#include "standard_benchmarks.h"
+
+#include "framework/reconstruction/FixedShardCountPolicy.h"
+
+#include <gsl/gsl_rng.h>
+
+#include "psu-util/timer.h"
+
+typedef de::Record<uint64_t, uint64_t> Rec;
+typedef de::TrieSpline<Rec> Shard;
+typedef de::rc::Query<Shard> Q;
+typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE,
+ de::FIFOScheduler>
+ Ext;
+typedef Q::Parameters QP;
+typedef de::DEConfiguration<Shard, Q, de::DeletePolicy::TOMBSTONE,
+ de::FIFOScheduler>
+ Conf;
+
+std::atomic<size_t> idx;
+std::atomic<bool> inserts_done = false;
+
+ssize_t query_ratio = 8;
+
+std::atomic<size_t> total_res = 0;
+size_t reccnt = 0;
+
+size_t g_thrd_cnt = 0;
+
+std::atomic<size_t> total_insert_time = 0;
+std::atomic<size_t> total_insert_count = 0;
+std::atomic<size_t> total_query_time = 0;
+std::atomic<size_t> total_query_count = 0;
+
+void query_thread(Ext *extension, std::vector<QP> *queries) {
+ TIMER_INIT();
+ while (!inserts_done.load()) {
+ total_query_count.fetch_add(1);
+ auto q_idx = rand() % queries->size();
+
+ auto q = (*queries)[q_idx];
+
+ TIMER_START();
+ auto res = extension->query(std::move(q)).get();
+ TIMER_STOP();
+
+ total_query_time.fetch_add(TIMER_RESULT());
+ total_res.fetch_add(res);
+ }
+}
+
+void insert_thread(Ext *extension, std::vector<Rec> *records, size_t start_idx, size_t stop_idx) {
+ TIMER_INIT();
+
+ TIMER_START();
+
+ for (size_t i=start_idx; i<stop_idx; i++) {
+ while (!extension->insert((*records)[i])) {
+ usleep(1);
+ }
+ }
+
+ TIMER_STOP();
+ total_insert_time.fetch_add(TIMER_RESULT());
+}
+
+void usage(char *progname) {
+ fprintf(stderr, "%s reccnt datafile queryfile\n", progname);
+}
+
+int main(int argc, char **argv) {
+
+ if (argc < 4) {
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+
+ size_t n = atol(argv[1]);
+ std::string d_fname = std::string(argv[2]);
+ std::string q_fname = std::string(argv[3]);
+
+ auto data = read_sosd_file<Rec>(d_fname, n);
+ auto queries = read_range_queries<QP>(q_fname, .0001);
+
+ std::vector<size_t> sfs = {8}; //, 4, 8, 16, 32, 64, 128, 256, 512, 1024};
+ size_t buffer_size = 8000;
+ std::vector<size_t> policies = {
+ 5
+ };
+
+ std::vector<size_t> thread_counts = {8, 16, 32};
+
+ size_t insert_threads = 1;
+ size_t query_threads = 6;
+
+ reccnt = n;
+
+ for (auto pol : policies) {
+ for (auto internal_thread_cnt : thread_counts) {
+ auto policy = get_policy<Shard, Q>(sfs[0], buffer_size, pol, n);
+ auto config = Conf(std::move(policy));
+ config.recon_enable_maint_on_flush = true;
+ config.recon_maint_disabled = false;
+ config.buffer_flush_trigger = 4000;
+ config.maximum_threads = internal_thread_cnt;
+
+ g_thrd_cnt = internal_thread_cnt;
+
+ total_insert_time.store(0);
+ total_query_time.store(0);
+ total_query_count.store(0);
+
+ auto extension = new Ext(std::move(config));
+
+ /* warmup structure w/ 10% of records */
+ size_t warmup = .1 * n;
+ for (size_t k = 0; k < warmup; k++) {
+ while (!extension->insert(data[k])) {
+ usleep(1);
+ }
+ }
+
+ extension->await_version();
+
+ idx.store(warmup);
+
+ std::thread i_thrds[insert_threads];
+ std::thread q_thrds[query_threads];
+
+ size_t per_insert_thrd = (n - warmup) / insert_threads;
+ size_t start = warmup;
+
+ for (size_t i=0; i<insert_threads; i++) {
+ i_thrds[i] = std::thread(insert_thread, extension, &data, start, start + per_insert_thrd);
+ start += per_insert_thrd;
+ }
+
+ for (size_t i=0; i<query_threads; i++) {
+ q_thrds[i] = std::thread(query_thread, extension, &queries);
+ }
+
+ for (size_t i=0; i<insert_threads; i++) {
+ i_thrds[i].join();
+ }
+
+ inserts_done.store(true);
+
+ for (size_t i=0; i<query_threads; i++) {
+ q_thrds[i].join();
+ }
+
+ fprintf(stderr, "%ld\n", total_res.load());
+
+ size_t insert_tput = ((double)(n - warmup) / (double) total_insert_time) *1e9;
+ size_t query_lat = (double) total_query_time.load() / (double) total_query_count.load();
+
+ fprintf(stdout, "%ld\t%ld\t%ld\n", internal_thread_cnt, insert_tput, query_lat);
+ fflush(stdout);
+
+ total_res.store(0);
+ inserts_done.store(false);
+ delete extension;
+ }
+ }
+
+ fflush(stderr);
+}
diff --git a/benchmarks/tail-latency/mixed_workload.cpp b/benchmarks/tail-latency/mixed_workload.cpp
index f4bfdda..7be07b6 100644
--- a/benchmarks/tail-latency/mixed_workload.cpp
+++ b/benchmarks/tail-latency/mixed_workload.cpp
@@ -38,7 +38,7 @@ typedef de::DEConfiguration<Shard, Q, de::DeletePolicy::TOMBSTONE,
std::atomic<size_t> idx;
std::atomic<bool> inserts_done = false;
-size_t query_ratio = 3;
+ssize_t query_ratio = 3;
std::atomic<size_t> total_res = 0;
size_t reccnt = 0;
@@ -51,7 +51,7 @@ void operation_thread(Ext *extension, std::vector<QP> *queries,
while (!inserts_done.load()) {
auto type = rand() % 10;
- if (type < 8) {
+ if (type < query_ratio) {
auto q_idx = rand() % queries->size();
auto q = (*queries)[q_idx];
diff --git a/benchmarks/tail-latency/mixed_workload_average.cpp b/benchmarks/tail-latency/mixed_workload_average.cpp
index cbf3a82..c740ed7 100644
--- a/benchmarks/tail-latency/mixed_workload_average.cpp
+++ b/benchmarks/tail-latency/mixed_workload_average.cpp
@@ -38,7 +38,7 @@ typedef de::DEConfiguration<Shard, Q, de::DeletePolicy::TOMBSTONE,
std::atomic<size_t> idx;
std::atomic<bool> inserts_done = false;
-size_t query_ratio = 3;
+ssize_t query_ratio = 5;
std::atomic<size_t> total_res = 0;
size_t reccnt = 0;
@@ -56,7 +56,7 @@ void operation_thread(Ext *extension, std::vector<QP> *queries,
while (!inserts_done.load()) {
auto type = rand() % 10;
- if (type < 8) {
+ if (type < query_ratio) {
total_query_count.fetch_add(1);
auto q_idx = rand() % queries->size();
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 4d4585f..8cef4a1 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -92,6 +92,7 @@ public:
std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0));
m_version_counter = INITIAL_VERSION;
+ m_preempt_version = INVALID_VERSION;
assert(m_config.recon_policy);
}
@@ -404,8 +405,28 @@ private:
LockManager m_lock_mngr;
+ std::atomic<size_t> m_preempt_version;
+
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
+ bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args, size_t version) {
+ if (version <= m_preempt_version.load()) {
+ // fprintf(stderr, "[I] Preempted query on version %ld, restarting...\n", version);
+ m_sched->schedule_job(async_query, 0, (void *)args, QUERY);
+ return true;
+ }
+
+ return false;
+ }
+
+ void preempt_queries() {
+ size_t vers = m_active_version.load()->get_id() - 1;
+ if (vers != m_preempt_version.load()){
+ m_preempt_version.store(vers);
+ // fprintf(stderr, "[I] Initiating query preemption on version %ld\n", vers);
+ }
+ }
+
void enforce_delete_invariant(VersionType *version) {
auto structure = version->get_structure();
auto compactions = structure->get_compaction_tasks();
@@ -471,7 +492,20 @@ private:
// fprintf(stderr, "[I] Buffer Head:\t%ld\n", new_head);
/* advance the buffer head for a flush */
- args->version->advance_buffer_head(new_head);
+ bool success = false;
+ size_t failure_cnt = 0;
+ while (!success) {
+ success = args->version->advance_buffer_head(new_head);
+ if (!success) {
+ failure_cnt++;
+ usleep(1);
+ // fprintf(stderr, "[I] Buffer head advance blocked on %ld\n", args->version->get_id());
+
+ if (failure_cnt >= extension->m_config.buffer_flush_query_preemption_trigger) {
+ extension->preempt_queries();
+ }
+ }
+ }
} else {
// fprintf(stderr, "[I] Running background reconstruction\n");
@@ -576,6 +610,19 @@ private:
if (query_results[i].size() > 0)
break;
}
+
+ /*
+ * for query preemption--check if the query should be restarted
+ * to prevent blocking buffer flushes for too long
+ */
+ if (args->extension->restart_query(args, version->get_id())) {
+ /* clean up memory allocated for temporary query objects */
+ delete buffer_query;
+ for (size_t i = 0; i < local_queries.size(); i++) {
+ delete local_queries[i];
+ }
+ return;
+ }
}
/*
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h
index e2acc8f..4cd73ba 100644
--- a/include/framework/scheduling/Version.h
+++ b/include/framework/scheduling/Version.h
@@ -82,12 +82,7 @@ public:
bool advance_buffer_head(size_t new_head) {
m_buffer_head = new_head;
-
- while (!m_buffer->advance_head(new_head)) {
- usleep(1);
- }
-
- return true;
+ return m_buffer->advance_head(new_head);
}
void update_shard_version(size_t version) {
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 105f0f3..e62a495 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -147,12 +147,6 @@ public:
assert(new_head > m_head.load().head_idx);
assert(new_head <= m_tail.load());
- /* refuse to advance head while there is an old with one references */
- if (m_old_head.load().refcnt > 0) {
- //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n");
- return false;
- }
-
m_active_head_advance.store(true);
if (m_old_head.load().refcnt > 0) {
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index 81698d2..fecb2bf 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -44,6 +44,8 @@ class DEConfiguration {
size_t minimum_query_threads = 4;
size_t maximum_memory_usage = 0; /* o for unbounded */
+ size_t buffer_flush_query_preemption_trigger = 10;
+
};
} // namespace de