diff options
Diffstat (limited to 'include/framework/scheduling')
| -rw-r--r-- | include/framework/scheduling/Epoch.h | 4 | ||||
| -rw-r--r-- | include/framework/scheduling/FIFOScheduler.h | 4 | ||||
| -rw-r--r-- | include/framework/scheduling/LockManager.h | 20 | ||||
| -rw-r--r-- | include/framework/scheduling/SerialScheduler.h | 3 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 11 | ||||
| -rw-r--r-- | include/framework/scheduling/Version.h | 20 | ||||
| -rw-r--r-- | include/framework/scheduling/statistics.h | 65 |
7 files changed, 62 insertions, 65 deletions
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 7583727..a642b31 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -60,7 +60,9 @@ public: Structure *get_mutable_structure() { return m_structure; } - BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); } + BufView get_buffer() const { + return m_buffer->get_buffer_view(m_buffer_head); + } /* * Returns a new Epoch object that is a copy of this one. The new object diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 8fbe07c..ef6dde5 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -61,7 +61,8 @@ public: std::unique_lock<std::mutex> lk(m_cv_lock); m_stats.job_queued(ts, type, size); - m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv)); + m_task_queue.push( + Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv)); m_cv.notify_all(); } @@ -81,7 +82,6 @@ private: [[maybe_unused]] size_t m_memory_budget; size_t m_thrd_cnt; - std::atomic<size_t> m_counter; std::mutex m_cv_lock; std::condition_variable m_cv; diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h index a40cf7a..820b975 100644 --- a/include/framework/scheduling/LockManager.h +++ b/include/framework/scheduling/LockManager.h @@ -3,15 +3,15 @@ */ #pragma once -#include <deque> #include <atomic> #include <cassert> +#include <deque> namespace de { class LockManager { public: - LockManager(size_t levels=1) { - for (size_t i=0; i < levels; i++) { + LockManager(size_t levels = 1) { + for (size_t i = 0; i < levels; i++) { m_lks.emplace_back(false); } @@ -20,9 +20,7 @@ public: ~LockManager() = default; - void add_lock() { - m_lks.emplace_back(false); - } + void add_lock() { m_lks.emplace_back(false); } void release_lock(size_t idx, size_t version) { if (idx < m_lks.size()) { @@ -72,17 +70,13 @@ public: return false; } - bool is_buffer_locked() { - return m_buffer_lk.load(); - } + bool is_buffer_locked() { return m_buffer_lk.load(); } - void release_buffer_lock() { - m_buffer_lk.store(false); - } + void release_buffer_lock() { m_buffer_lk.store(false); } private: std::deque<std::atomic<bool>> m_lks; std::atomic<bool> m_buffer_lk; std::atomic<size_t> m_last_unlocked_version; }; -} +} // namespace de diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index b6ebe53..6a8ed58 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -37,7 +37,8 @@ public: t(0); } - void shutdown() { /* intentionally left blank */ } + void shutdown() { /* intentionally left blank */ + } void print_statistics() { m_stats.print_statistics(); } void print_query_time_data() { m_stats.print_query_time_data(); } diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 4529b2e..00ddbbb 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -14,9 +14,9 @@ #pragma once #include <chrono> +#include <condition_variable> #include <functional> #include <future> -#include <condition_variable> #include "framework/scheduling/Version.h" #include "framework/scheduling/statistics.h" @@ -24,11 +24,7 @@ namespace de { -enum class ReconstructionPriority { - FLUSH = 0, - CMPCT = 1, - MAINT = 2 -}; +enum class ReconstructionPriority { FLUSH = 0, CMPCT = 1, MAINT = 2 }; template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> struct ReconstructionArgs { @@ -51,7 +47,8 @@ typedef std::function<void(void *)> Job; struct Task { Task(size_t size, size_t ts, Job job, void *args, size_t type = 0, - SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr, std::condition_variable *cv=nullptr) + SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr, + std::condition_variable *cv = nullptr) : m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type), m_stats(stats), m_lk(lk), m_cv(cv) {} diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index be54c84..bbcbe25 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -25,18 +25,17 @@ private: typedef BufferView<RecordType> BufferViewType; public: - Version(size_t vid = 0) : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0) {} - Version(size_t number, std::unique_ptr<StructureType> structure, BufferType *buff, - size_t head) + Version(size_t number, std::unique_ptr<StructureType> structure, + BufferType *buff, size_t head) : m_buffer(buff), m_structure(std::move(structure)), m_id(number), m_buffer_head(head) { - if (m_buffer) { - m_buffer->take_head_reference(m_buffer_head); - } - } + if (m_buffer) { + m_buffer->take_head_reference(m_buffer_head); + } + } ~Version() { if (m_buffer) { @@ -55,7 +54,7 @@ public: size_t get_id() const { return m_id; } - void set_id(size_t id) { m_id = id;} + void set_id(size_t id) { m_id = id; } const StructureType *get_structure() const { return m_structure.get(); } @@ -107,10 +106,7 @@ public: m_structure->update_shard_version(version); } - size_t get_head() { - return m_buffer_head; - } - + size_t get_head() { return m_buffer_head; } void set_buffer(BufferType *buffer, size_t head) { assert(m_buffer == nullptr); diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h index 6d9f9f0..8cb6dbd 100644 --- a/include/framework/scheduling/statistics.h +++ b/include/framework/scheduling/statistics.h @@ -16,12 +16,12 @@ #include <atomic> #include <cassert> #include <chrono> +#include <cmath> #include <cstdint> #include <cstdlib> #include <mutex> #include <unordered_map> #include <vector> -#include <cmath> namespace de { @@ -68,7 +68,6 @@ private: queue_time) .count(); } - }; public: @@ -117,10 +116,10 @@ public: size_t worst_query = 0; size_t first_query = UINT64_MAX; - for (auto &job : m_jobs) { if (job.second.type != 1) { - fprintf(stdout, "%ld %ld %ld %ld\n", job.second.id, job.second.size, job.second.runtime(), job.second.runtime() / (job.second.size)); + fprintf(stdout, "%ld %ld %ld %ld\n", job.second.id, job.second.size, + job.second.runtime(), job.second.runtime() / (job.second.size)); } if (job.first < first_query) { @@ -149,7 +148,6 @@ public: query_cnt++; } - int64_t average_queue_time = (query_cnt) ? total_queue_time / query_cnt : 0; int64_t average_runtime = (query_cnt) ? total_runtime / query_cnt : 0; @@ -162,28 +160,37 @@ public: continue; } - queue_deviation_sum += std::pow(job.second.time_in_queue() - average_queue_time, 2); - runtime_deviation_sum += std::pow(job.second.runtime() - average_runtime, 2); - + queue_deviation_sum += + std::pow(job.second.time_in_queue() - average_queue_time, 2); + runtime_deviation_sum += + std::pow(job.second.runtime() - average_runtime, 2); } - int64_t queue_stddev = (query_cnt) ? std::sqrt(queue_deviation_sum / query_cnt) : 0; - int64_t runtime_stddev = (query_cnt) ? std::sqrt(runtime_deviation_sum / query_cnt) : 0; - - - fprintf(stdout, "Query Count: %ld\tWorst Query: %ld\tFirst Query: %ld\n", query_cnt, worst_query, first_query); - fprintf(stdout, "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t Max Scheduling Delay: %ld\tStandard Deviation: %ld\n", average_queue_time, min_queue_time, max_queue_time, queue_stddev); - fprintf(stdout, "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query Latency: %ld\tStandard Deviation: %ld\n", average_runtime, min_runtime, max_runtime, runtime_stddev); + int64_t queue_stddev = + (query_cnt) ? std::sqrt(queue_deviation_sum / query_cnt) : 0; + int64_t runtime_stddev = + (query_cnt) ? std::sqrt(runtime_deviation_sum / query_cnt) : 0; + + fprintf(stdout, "Query Count: %ld\tWorst Query: %ld\tFirst Query: %ld\n", + query_cnt, worst_query, first_query); + fprintf(stdout, + "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t " + "Max Scheduling Delay: %ld\tStandard Deviation: %ld\n", + average_queue_time, min_queue_time, max_queue_time, queue_stddev); + fprintf(stdout, + "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query " + "Latency: %ld\tStandard Deviation: %ld\n", + average_runtime, min_runtime, max_runtime, runtime_stddev); } - void print_query_time_data() { std::unique_lock<std::mutex> lk(m_mutex); update_job_data_from_log(); for (auto &job : m_jobs) { if (job.second.type == 1) { - fprintf(stdout, "%ld\t%ld\t%ld\n", job.second.time_in_queue(), job.second.runtime(), job.second.end_to_end_time()); + fprintf(stdout, "%ld\t%ld\t%ld\n", job.second.time_in_queue(), + job.second.runtime(), job.second.end_to_end_time()); } } } @@ -211,18 +218,18 @@ private: auto &job = m_jobs.at(event.id); switch (event.type) { - case EventType::QUEUED: - job.queue_time = event.time; - break; - case EventType::FINISHED: - job.stop_time = event.time; - break; - case EventType::SCHEDULED: - job.scheduled_time = event.time; - break; - case EventType::STARTED: - job.start_time = event.time; - break; + case EventType::QUEUED: + job.queue_time = event.time; + break; + case EventType::FINISHED: + job.stop_time = event.time; + break; + case EventType::SCHEDULED: + job.scheduled_time = event.time; + break; + case EventType::STARTED: + job.start_time = event.time; + break; } } |