summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--benchmarks/tail-latency/query_parm_sweep.cpp21
-rw-r--r--include/framework/DynamicExtension.h12
-rw-r--r--include/framework/scheduling/FIFOScheduler.h2
-rw-r--r--include/framework/scheduling/Task.h9
-rw-r--r--include/framework/scheduling/statistics.h122
-rw-r--r--include/framework/util/Configuration.h2
6 files changed, 130 insertions, 38 deletions
diff --git a/benchmarks/tail-latency/query_parm_sweep.cpp b/benchmarks/tail-latency/query_parm_sweep.cpp
index 4f11ce0..a91db8c 100644
--- a/benchmarks/tail-latency/query_parm_sweep.cpp
+++ b/benchmarks/tail-latency/query_parm_sweep.cpp
@@ -29,11 +29,11 @@ typedef de::Record<uint64_t, uint64_t> Rec;
typedef de::ISAMTree<Rec> Shard;
typedef de::pl::Query<Shard> Q;
typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE,
- de::SerialScheduler>
+ de::FIFOScheduler>
Ext;
typedef Q::Parameters QP;
typedef de::DEConfiguration<Shard, Q, de::DeletePolicy::TOMBSTONE,
- de::SerialScheduler>
+ de::FIFOScheduler>
Conf;
std::atomic<size_t> idx;
@@ -103,11 +103,11 @@ int main(int argc, char **argv) {
auto queries =read_sosd_point_lookups<QP>(q_fname, 100);
size_t buffer_size = 8000;
- std::vector<size_t> policies = {0, 1, 2};
+ std::vector<size_t> policies = {0};
std::vector<size_t> thread_counts = {8};
- std::vector<double> modifiers = {0, .3, .5, .8};
- std::vector<size_t> scale_factors = {2, 4, 8};
+ std::vector<double> modifiers = {0};
+ std::vector<size_t> scale_factors = {8, 8, 8, 8, 8};
size_t insert_threads = 1;
size_t query_threads = 1;
@@ -165,11 +165,19 @@ int main(int argc, char **argv) {
extension->await_version();
+ /* run some queries to "warm up" the cache */
+ for (size_t i=0; i<queries.size()*2; i++) {
+ auto q_idx = i % queries.size();
+ auto q = queries[q_idx];
+ auto res = extension->query(std::move(q)).get();
+ total_res.fetch_add(res.size());
+ }
+
total_query_count.store(50000);
TIMER_INIT();
TIMER_START();
for (size_t i=0; i<total_query_count; i++) {
- auto q_idx = rand() % queries.size();
+ auto q_idx = i % queries.size();
auto q = queries[q_idx];
auto res = extension->query(std::move(q)).get();
total_res.fetch_add(res.size());
@@ -187,6 +195,7 @@ int main(int argc, char **argv) {
fprintf(stdout, "%ld\t%ld\t%ld\t%lf\t%ld\t%ld\t%ld\t%ld\n", internal_thread_cnt, pol, sf,
mod, extension->get_height(), extension->get_shard_count(),
insert_tput, query_lat);
+ extension->print_scheduler_statistics();
fflush(stdout);
total_res.store(0);
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index fb82638..0bb1524 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -471,7 +471,7 @@ private:
static void reconstruction(void *arguments) {
auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments;
auto extension = (DynamicExtension *)args->extension;
- extension->SetThreadAffinity();
+ extension->set_thread_affinity();
static std::atomic<size_t> cnt = 0;
size_t recon_id = cnt.fetch_add(1);
@@ -661,6 +661,7 @@ private:
static void async_query(void *arguments) {
auto *args = (QueryArgs<ShardType, QueryType, DynamicExtension> *)arguments;
+ args->extension->set_thread_affinity();
auto version = args->extension->get_active_version();
@@ -927,13 +928,12 @@ private:
}
//#ifdef _GNU_SOURCE
-#if 0
- void SetThreadAffinity() {
+ void set_thread_affinity() {
if constexpr (std::same_as<SchedType, SerialScheduler>) {
return;
}
- int core = m_next_core.fetch_add(1) % m_core_cnt;
+ int core = m_next_core.fetch_add(1) % m_config.physical_core_count;
cpu_set_t mask;
CPU_ZERO(&mask);
@@ -954,8 +954,10 @@ private:
CPU_SET(core, &mask);
::sched_setaffinity(0, sizeof(mask), &mask);
}
+ /*
#else
- void SetThreadAffinity() {}
+ void set_thread_affinity() {}
#endif
+*/
};
} // namespace de
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 6875aca..703f13e 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -14,6 +14,7 @@
#include "framework/scheduling/Task.h"
#include "framework/scheduling/statistics.h"
+#include <bits/chrono.h>
#include <chrono>
#include <condition_variable>
#include <thread>
@@ -61,7 +62,6 @@ public:
m_stats.job_queued(ts, type, size);
m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv));
-
m_cv.notify_all();
}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 3dbc9f4..b2884c7 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -72,7 +72,6 @@ struct Task {
}
void operator()(size_t thrd_id) {
- auto start = std::chrono::high_resolution_clock::now();
if (m_stats) {
m_stats->job_begin(m_timestamp);
}
@@ -82,14 +81,6 @@ struct Task {
if (m_stats) {
m_stats->job_complete(m_timestamp);
}
- auto stop = std::chrono::high_resolution_clock::now();
-
- if (m_stats) {
- auto time =
- std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start)
- .count();
- m_stats->log_time_data(time, m_type);
- }
if (m_lk) {
m_lk->unlock();
diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h
index a6d66ab..706ba84 100644
--- a/include/framework/scheduling/statistics.h
+++ b/include/framework/scheduling/statistics.h
@@ -30,25 +30,44 @@ private:
struct Event {
size_t id;
EventType type;
+ std::chrono::system_clock::time_point time;
+
+ Event(size_t id, EventType type)
+ : id(id), type(type), time(std::chrono::high_resolution_clock::now()) {}
};
struct JobInfo {
size_t id;
size_t size;
size_t type;
+
+ JobInfo(size_t id, size_t size, size_t type) : id(id), size(size), type(type) {}
};
public:
SchedulerStatistics() = default;
~SchedulerStatistics() = default;
- void job_queued(size_t id, size_t type, size_t size) { }
+ void job_queued(size_t id, size_t type, size_t size) {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ m_jobs.insert({id, {id, size, type}});
+ m_event_log.emplace_back(id, EventType::QUEUED);
+ }
- void job_scheduled(size_t id) { std::unique_lock<std::mutex> lk(m_mutex); }
+ void job_scheduled(size_t id) {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ m_event_log.emplace_back(id, EventType::SCHEDULED);
+ }
- void job_begin(size_t id) {}
+ void job_begin(size_t id) {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ m_event_log.emplace_back(id, EventType::STARTED);
+ }
- void job_complete(size_t id) {}
+ void job_complete(size_t id) {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ m_event_log.emplace_back(id, EventType::FINISHED);
+ }
/* FIXME: This is just a temporary approach */
void log_time_data(size_t length, size_t type) {
@@ -72,21 +91,90 @@ public:
}
void print_statistics() {
- if (m_type_1_cnt > 0) {
- fprintf(
- stdout,
- "Query Count: %ld\tQuery Avg. Latency: %ld\tMax Query Latency: %ld\n",
- m_type_1_cnt.load(), m_type_1_total_time.load() / m_type_1_cnt.load(),
- m_type_1_largest_time.load());
+ int64_t total_queue_time = 0;
+ int64_t max_queue_time = 0;
+ int64_t min_queue_time = INT64_MAX;
+
+ int64_t total_runtime = 0;
+ int64_t max_runtime = 0;
+ int64_t min_runtime = INT64_MAX;
+
+ int64_t query_cnt = 0;
+
+ /* dumb brute force approach; there are a million better ways to do this */
+ size_t i = 0;
+ for (auto &job : m_jobs) {
+ std::chrono::system_clock::time_point queue_time;
+ std::chrono::system_clock::time_point schedule_time;
+ std::chrono::system_clock::time_point start_time;
+ std::chrono::system_clock::time_point stop_time;
+
+ /* just look at queries for now */
+ if (job.second.type == 1) {
+ for (auto &event : m_event_log) {
+ if (event.id == job.first) {
+ switch (event.type) {
+ case EventType::QUEUED:
+ queue_time = event.time;
+ i++;
+ break;
+ case EventType::FINISHED:
+ stop_time = event.time;
+ i++;
+ break;
+ case EventType::SCHEDULED:
+ schedule_time = event.time;
+ i++;
+ break;
+ case EventType::STARTED:
+ start_time = event.time;
+ i++;
+ break;
+ }
+ }
+ }
+ }
+ /* event wasn't fully logged, so we'll skip it */
+ if (i != 4) {
+ i=0;
+ continue;
+ }
+ i=0;
+
+ auto time_in_queue = std::chrono::duration_cast<std::chrono::nanoseconds>(schedule_time - queue_time).count();
+ auto runtime = std::chrono::duration_cast<std::chrono::nanoseconds>(stop_time - start_time).count();
+
+ total_queue_time += time_in_queue;
+ total_runtime += runtime;
+
+ if (time_in_queue > max_queue_time) {
+ max_queue_time = time_in_queue;
+ }
+
+ if (time_in_queue < min_queue_time) {
+ min_queue_time = time_in_queue;
+ }
+
+ if (runtime > max_runtime) {
+ max_runtime = runtime;
+ }
+
+ if (runtime < min_runtime) {
+ min_runtime = runtime;
+ }
+
+ query_cnt++;
}
- if (m_type_2_cnt > 0) {
- fprintf(stdout,
- "Reconstruction Count: %ld\tReconstruction Avg. Latency: "
- "%ld\tMax Recon. Latency:%ld\n",
- m_type_2_cnt.load(),
- m_type_2_total_time.load() / m_type_2_cnt.load(),
- m_type_2_largest_time.load());
+
+ if (query_cnt == 0) {
+ return;
}
+
+ int64_t average_queue_time = total_queue_time / query_cnt;
+ int64_t average_runtime = total_runtime / query_cnt;
+
+ fprintf(stdout, "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t Max Scheduling Delay: %ld\n", average_queue_time, min_queue_time, max_queue_time);
+ fprintf(stdout, "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query Latency: %ld\n", average_runtime, min_runtime, max_runtime);
}
private:
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index 2b8a7fc..dde87fe 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -44,6 +44,8 @@ class DEConfiguration {
size_t minimum_query_threads = 4;
size_t maximum_memory_usage = 0; /* o for unbounded */
+ size_t physical_core_count = 6;
+
size_t buffer_flush_query_preemption_trigger = UINT64_MAX;
};