summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/scheduling')
-rw-r--r--include/framework/scheduling/Epoch.h4
-rw-r--r--include/framework/scheduling/FIFOScheduler.h4
-rw-r--r--include/framework/scheduling/LockManager.h20
-rw-r--r--include/framework/scheduling/SerialScheduler.h3
-rw-r--r--include/framework/scheduling/Task.h11
-rw-r--r--include/framework/scheduling/Version.h20
-rw-r--r--include/framework/scheduling/statistics.h65
7 files changed, 62 insertions, 65 deletions
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index 7583727..a642b31 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -60,7 +60,9 @@ public:
Structure *get_mutable_structure() { return m_structure; }
- BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); }
+ BufView get_buffer() const {
+ return m_buffer->get_buffer_view(m_buffer_head);
+ }
/*
* Returns a new Epoch object that is a copy of this one. The new object
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 8fbe07c..ef6dde5 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -61,7 +61,8 @@ public:
std::unique_lock<std::mutex> lk(m_cv_lock);
m_stats.job_queued(ts, type, size);
- m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv));
+ m_task_queue.push(
+ Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv));
m_cv.notify_all();
}
@@ -81,7 +82,6 @@ private:
[[maybe_unused]] size_t m_memory_budget;
size_t m_thrd_cnt;
-
std::atomic<size_t> m_counter;
std::mutex m_cv_lock;
std::condition_variable m_cv;
diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h
index a40cf7a..820b975 100644
--- a/include/framework/scheduling/LockManager.h
+++ b/include/framework/scheduling/LockManager.h
@@ -3,15 +3,15 @@
*/
#pragma once
-#include <deque>
#include <atomic>
#include <cassert>
+#include <deque>
namespace de {
class LockManager {
public:
- LockManager(size_t levels=1) {
- for (size_t i=0; i < levels; i++) {
+ LockManager(size_t levels = 1) {
+ for (size_t i = 0; i < levels; i++) {
m_lks.emplace_back(false);
}
@@ -20,9 +20,7 @@ public:
~LockManager() = default;
- void add_lock() {
- m_lks.emplace_back(false);
- }
+ void add_lock() { m_lks.emplace_back(false); }
void release_lock(size_t idx, size_t version) {
if (idx < m_lks.size()) {
@@ -72,17 +70,13 @@ public:
return false;
}
- bool is_buffer_locked() {
- return m_buffer_lk.load();
- }
+ bool is_buffer_locked() { return m_buffer_lk.load(); }
- void release_buffer_lock() {
- m_buffer_lk.store(false);
- }
+ void release_buffer_lock() { m_buffer_lk.store(false); }
private:
std::deque<std::atomic<bool>> m_lks;
std::atomic<bool> m_buffer_lk;
std::atomic<size_t> m_last_unlocked_version;
};
-}
+} // namespace de
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
index b6ebe53..6a8ed58 100644
--- a/include/framework/scheduling/SerialScheduler.h
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -37,7 +37,8 @@ public:
t(0);
}
- void shutdown() { /* intentionally left blank */ }
+ void shutdown() { /* intentionally left blank */
+ }
void print_statistics() { m_stats.print_statistics(); }
void print_query_time_data() { m_stats.print_query_time_data(); }
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 4529b2e..00ddbbb 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -14,9 +14,9 @@
#pragma once
#include <chrono>
+#include <condition_variable>
#include <functional>
#include <future>
-#include <condition_variable>
#include "framework/scheduling/Version.h"
#include "framework/scheduling/statistics.h"
@@ -24,11 +24,7 @@
namespace de {
-enum class ReconstructionPriority {
- FLUSH = 0,
- CMPCT = 1,
- MAINT = 2
-};
+enum class ReconstructionPriority { FLUSH = 0, CMPCT = 1, MAINT = 2 };
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
struct ReconstructionArgs {
@@ -51,7 +47,8 @@ typedef std::function<void(void *)> Job;
struct Task {
Task(size_t size, size_t ts, Job job, void *args, size_t type = 0,
- SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr, std::condition_variable *cv=nullptr)
+ SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr,
+ std::condition_variable *cv = nullptr)
: m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type),
m_stats(stats), m_lk(lk), m_cv(cv) {}
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h
index be54c84..bbcbe25 100644
--- a/include/framework/scheduling/Version.h
+++ b/include/framework/scheduling/Version.h
@@ -25,18 +25,17 @@ private:
typedef BufferView<RecordType> BufferViewType;
public:
-
Version(size_t vid = 0)
: m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0) {}
- Version(size_t number, std::unique_ptr<StructureType> structure, BufferType *buff,
- size_t head)
+ Version(size_t number, std::unique_ptr<StructureType> structure,
+ BufferType *buff, size_t head)
: m_buffer(buff), m_structure(std::move(structure)), m_id(number),
m_buffer_head(head) {
- if (m_buffer) {
- m_buffer->take_head_reference(m_buffer_head);
- }
- }
+ if (m_buffer) {
+ m_buffer->take_head_reference(m_buffer_head);
+ }
+ }
~Version() {
if (m_buffer) {
@@ -55,7 +54,7 @@ public:
size_t get_id() const { return m_id; }
- void set_id(size_t id) { m_id = id;}
+ void set_id(size_t id) { m_id = id; }
const StructureType *get_structure() const { return m_structure.get(); }
@@ -107,10 +106,7 @@ public:
m_structure->update_shard_version(version);
}
- size_t get_head() {
- return m_buffer_head;
- }
-
+ size_t get_head() { return m_buffer_head; }
void set_buffer(BufferType *buffer, size_t head) {
assert(m_buffer == nullptr);
diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h
index 6d9f9f0..8cb6dbd 100644
--- a/include/framework/scheduling/statistics.h
+++ b/include/framework/scheduling/statistics.h
@@ -16,12 +16,12 @@
#include <atomic>
#include <cassert>
#include <chrono>
+#include <cmath>
#include <cstdint>
#include <cstdlib>
#include <mutex>
#include <unordered_map>
#include <vector>
-#include <cmath>
namespace de {
@@ -68,7 +68,6 @@ private:
queue_time)
.count();
}
-
};
public:
@@ -117,10 +116,10 @@ public:
size_t worst_query = 0;
size_t first_query = UINT64_MAX;
-
for (auto &job : m_jobs) {
if (job.second.type != 1) {
- fprintf(stdout, "%ld %ld %ld %ld\n", job.second.id, job.second.size, job.second.runtime(), job.second.runtime() / (job.second.size));
+ fprintf(stdout, "%ld %ld %ld %ld\n", job.second.id, job.second.size,
+ job.second.runtime(), job.second.runtime() / (job.second.size));
}
if (job.first < first_query) {
@@ -149,7 +148,6 @@ public:
query_cnt++;
}
-
int64_t average_queue_time = (query_cnt) ? total_queue_time / query_cnt : 0;
int64_t average_runtime = (query_cnt) ? total_runtime / query_cnt : 0;
@@ -162,28 +160,37 @@ public:
continue;
}
- queue_deviation_sum += std::pow(job.second.time_in_queue() - average_queue_time, 2);
- runtime_deviation_sum += std::pow(job.second.runtime() - average_runtime, 2);
-
+ queue_deviation_sum +=
+ std::pow(job.second.time_in_queue() - average_queue_time, 2);
+ runtime_deviation_sum +=
+ std::pow(job.second.runtime() - average_runtime, 2);
}
- int64_t queue_stddev = (query_cnt) ? std::sqrt(queue_deviation_sum / query_cnt) : 0;
- int64_t runtime_stddev = (query_cnt) ? std::sqrt(runtime_deviation_sum / query_cnt) : 0;
-
-
- fprintf(stdout, "Query Count: %ld\tWorst Query: %ld\tFirst Query: %ld\n", query_cnt, worst_query, first_query);
- fprintf(stdout, "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t Max Scheduling Delay: %ld\tStandard Deviation: %ld\n", average_queue_time, min_queue_time, max_queue_time, queue_stddev);
- fprintf(stdout, "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query Latency: %ld\tStandard Deviation: %ld\n", average_runtime, min_runtime, max_runtime, runtime_stddev);
+ int64_t queue_stddev =
+ (query_cnt) ? std::sqrt(queue_deviation_sum / query_cnt) : 0;
+ int64_t runtime_stddev =
+ (query_cnt) ? std::sqrt(runtime_deviation_sum / query_cnt) : 0;
+
+ fprintf(stdout, "Query Count: %ld\tWorst Query: %ld\tFirst Query: %ld\n",
+ query_cnt, worst_query, first_query);
+ fprintf(stdout,
+ "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t "
+ "Max Scheduling Delay: %ld\tStandard Deviation: %ld\n",
+ average_queue_time, min_queue_time, max_queue_time, queue_stddev);
+ fprintf(stdout,
+ "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query "
+ "Latency: %ld\tStandard Deviation: %ld\n",
+ average_runtime, min_runtime, max_runtime, runtime_stddev);
}
-
void print_query_time_data() {
std::unique_lock<std::mutex> lk(m_mutex);
update_job_data_from_log();
for (auto &job : m_jobs) {
if (job.second.type == 1) {
- fprintf(stdout, "%ld\t%ld\t%ld\n", job.second.time_in_queue(), job.second.runtime(), job.second.end_to_end_time());
+ fprintf(stdout, "%ld\t%ld\t%ld\n", job.second.time_in_queue(),
+ job.second.runtime(), job.second.end_to_end_time());
}
}
}
@@ -211,18 +218,18 @@ private:
auto &job = m_jobs.at(event.id);
switch (event.type) {
- case EventType::QUEUED:
- job.queue_time = event.time;
- break;
- case EventType::FINISHED:
- job.stop_time = event.time;
- break;
- case EventType::SCHEDULED:
- job.scheduled_time = event.time;
- break;
- case EventType::STARTED:
- job.start_time = event.time;
- break;
+ case EventType::QUEUED:
+ job.queue_time = event.time;
+ break;
+ case EventType::FINISHED:
+ job.stop_time = event.time;
+ break;
+ case EventType::SCHEDULED:
+ job.scheduled_time = event.time;
+ break;
+ case EventType::STARTED:
+ job.start_time = event.time;
+ break;
}
}