From 9fe305c7d28e993e55c55427f377ae7e3251ea4f Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Fri, 6 Dec 2024 13:13:51 -0500 Subject: Interface update (#5) * Query Interface Adjustments/Refactoring Began the process of adjusting the query interface (and also the shard interface, to a lesser degree) to better accommodate the user. In particular the following changes have been made, 1. The number of necessary template arguments for the query type has been drastically reduced, while also removing the void pointers and manual delete functions from the interface. This was accomplished by requiring many of the sub-types associated with a query (parameters, etc.) to be nested inside the main query class, and by forcing the SHARD type to expose its associated record type. 2. User-defined query return types are now supported. Queries no longer are required to return strictly sets of records. Instead, the query now has LocalResultType and ResultType template parameters (which can be defaulted using a typedef in the Query type itself), allowing much more flexibility. Note that, at least for the short term, the LocalResultType must still expose the same is_deleted/is_tombstone interface as a Wrapped used to, as this is currently needed for delete filtering. A better approach to this is, hopefully, forthcoming. 3. Updated the ISAMTree.h shard and rangequery.h query to use the new interfaces, and adjusted the associated unit tests as well. 4. Dropped the unnecessary "get_data()" function from the ShardInterface concept. 5. Dropped the need to specify a record type in the ShardInterface concept. This is now handled using a required Shard::RECORD member of the Shard class itself, which should expose the name of the record type. * Updates to framework to support new Query/Shard interfaces Pretty extensive adjustments to the framework, particularly to the templates themselves, along with some type-renaming work, to support the new query and shard interfaces. Adjusted the external query interface to take an rvalue reference, rather than a pointer, to the query parameters. * Removed framework-level delete filtering This was causing some issues with the new query interface, and should probably be reworked anyway, so I'm temporarily (TM) removing the feature. * Updated benchmarks + remaining code for new interface --- include/framework/scheduling/Epoch.h | 209 ++++++++++++------------- include/framework/scheduling/FIFOScheduler.h | 143 ++++++++--------- include/framework/scheduling/SerialScheduler.h | 64 ++++---- include/framework/scheduling/Task.h | 101 ++++++------ include/framework/scheduling/statistics.h | 140 ++++++++--------- 5 files changed, 310 insertions(+), 347 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 9377fb0..03675b1 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -1,7 +1,7 @@ /* * include/framework/scheduling/Epoch.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -11,133 +11,120 @@ #include #include -#include "framework/structure/MutableBuffer.h" -#include "framework/structure/ExtensionStructure.h" #include "framework/structure/BufferView.h" +#include "framework/structure/ExtensionStructure.h" +#include "framework/structure/MutableBuffer.h" namespace de { - -template S, QueryInterface Q, LayoutPolicy L> +template QueryType, + LayoutPolicy L> class Epoch { private: - typedef MutableBuffer Buffer; - typedef ExtensionStructure Structure; - typedef BufferView BufView; -public: - Epoch(size_t number=0) - : m_buffer(nullptr) - , m_structure(nullptr) - , m_active_merge(false) - , m_epoch_number(number) - , m_buffer_head(0) - {} - - Epoch(size_t number, Structure *structure, Buffer *buff, size_t head) - : m_buffer(buff) - , m_structure(structure) - , m_active_merge(false) - , m_epoch_number(number) - , m_buffer_head(head) - { - structure->take_reference(); - } - - ~Epoch() { - if (m_structure) { - m_structure->release_reference(); - } - - if (m_structure->get_reference_count() == 0) { - delete m_structure; - } + typedef typename ShardType::RECORD RecordType; + typedef MutableBuffer Buffer; + typedef ExtensionStructure Structure; + typedef BufferView BufView; +public: + Epoch(size_t number = 0) + : m_buffer(nullptr), m_structure(nullptr), m_active_merge(false), + m_epoch_number(number), m_buffer_head(0) {} + + Epoch(size_t number, Structure *structure, Buffer *buff, size_t head) + : m_buffer(buff), m_structure(structure), m_active_merge(false), + m_epoch_number(number), m_buffer_head(head) { + structure->take_reference(); + } + + ~Epoch() { + if (m_structure) { + m_structure->release_reference(); } - /* - * Epochs are *not* copyable or movable. Only one can exist, and all users - * of it work with pointers - */ - Epoch(const Epoch&) = delete; - Epoch(Epoch&&) = delete; - Epoch &operator=(const Epoch&) = delete; - Epoch &operator=(Epoch&&) = delete; - - size_t get_epoch_number() { - return m_epoch_number; + if (m_structure->get_reference_count() == 0) { + delete m_structure; } - - Structure *get_structure() { - return m_structure; + } + + /* + * Epochs are *not* copyable or movable. Only one can exist, and all users + * of it work with pointers + */ + Epoch(const Epoch &) = delete; + Epoch(Epoch &&) = delete; + Epoch &operator=(const Epoch &) = delete; + Epoch &operator=(Epoch &&) = delete; + + size_t get_epoch_number() { return m_epoch_number; } + + Structure *get_structure() { return m_structure; } + + BufView get_buffer() { return m_buffer->get_buffer_view(m_buffer_head); } + + /* + * Returns a new Epoch object that is a copy of this one. The new object + * will also contain a copy of the m_structure, rather than a reference to + * the same one. The epoch number of the new epoch will be set to the + * provided argument. + */ + Epoch *clone(size_t number) { + std::unique_lock m_buffer_lock; + auto epoch = new Epoch(number); + epoch->m_buffer = m_buffer; + epoch->m_buffer_head = m_buffer_head; + + if (m_structure) { + epoch->m_structure = m_structure->copy(); + /* the copy routine returns a structure with 0 references */ + epoch->m_structure->take_reference(); } - BufView get_buffer() { - return m_buffer->get_buffer_view(m_buffer_head); + return epoch; + } + + /* + * Check if a merge can be started from this Epoch. At present, without + * concurrent merging, this simply checks if there is currently a scheduled + * merge based on this Epoch. If there is, returns false. If there isn't, + * return true and set a flag indicating that there is an active merge. + */ + bool prepare_reconstruction() { + auto old = m_active_merge.load(); + if (old) { + return false; } - /* - * Returns a new Epoch object that is a copy of this one. The new object - * will also contain a copy of the m_structure, rather than a reference to - * the same one. The epoch number of the new epoch will be set to the - * provided argument. - */ - Epoch *clone(size_t number) { - std::unique_lock m_buffer_lock; - auto epoch = new Epoch(number); - epoch->m_buffer = m_buffer; - epoch->m_buffer_head = m_buffer_head; - - if (m_structure) { - epoch->m_structure = m_structure->copy(); - /* the copy routine returns a structure with 0 references */ - epoch->m_structure->take_reference(); - } - - return epoch; + // FIXME: this needs cleaned up + while (!m_active_merge.compare_exchange_strong(old, true)) { + old = m_active_merge.load(); + if (old) { + return false; + } } - /* - * Check if a merge can be started from this Epoch. At present, without - * concurrent merging, this simply checks if there is currently a scheduled - * merge based on this Epoch. If there is, returns false. If there isn't, - * return true and set a flag indicating that there is an active merge. - */ - bool prepare_reconstruction() { - auto old = m_active_merge.load(); - if (old) { - return false; - } - - // FIXME: this needs cleaned up - while (!m_active_merge.compare_exchange_strong(old, true)) { - old = m_active_merge.load(); - if (old) { - return false; - } - } - - return true; - } + return true; + } - bool advance_buffer_head(size_t head) { - m_buffer_head = head; - return m_buffer->advance_head(m_buffer_head); - } + bool advance_buffer_head(size_t head) { + m_buffer_head = head; + return m_buffer->advance_head(m_buffer_head); + } private: - Structure *m_structure; - Buffer *m_buffer; - - std::mutex m_buffer_lock; - std::atomic m_active_merge; - - /* - * The number of currently active jobs - * (queries/merges) operating on this - * epoch. An epoch can only be retired - * when this number is 0. - */ - size_t m_epoch_number; - size_t m_buffer_head; + Buffer *m_buffer; + Structure *m_structure; + + std::mutex m_buffer_lock; + std::atomic m_active_merge; + + /* + * The number of currently active jobs + * (queries/merges) operating on this + * epoch. An epoch can only be retired + * when this number is 0. + */ + size_t m_epoch_number; + size_t m_buffer_head; }; -} +} // namespace de diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 3ed4f49..7cb6d20 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -1,7 +1,7 @@ /* * include/framework/scheduling/FIFOScheduler.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -17,11 +17,11 @@ */ #pragma once -#include -#include -#include #include "framework/scheduling/Task.h" #include "framework/scheduling/statistics.h" +#include +#include +#include #include "ctpl/ctpl.h" #include "psu-ds/LockedPriorityQueue.h" @@ -30,100 +30,95 @@ namespace de { using namespace std::literals::chrono_literals; - class FIFOScheduler { private: - static const size_t DEFAULT_MAX_THREADS = 8; + static const size_t DEFAULT_MAX_THREADS = 8; public: - FIFOScheduler(size_t memory_budget, size_t thread_cnt) - : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS) - , m_used_memory(0) - , m_used_thrds(0) - , m_shutdown(false) - { - m_sched_thrd = std::thread(&FIFOScheduler::run, this); - m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this); - m_thrd_pool.resize(m_thrd_cnt); + FIFOScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX), + m_thrd_cnt((thread_cnt) ? thread_cnt : DEFAULT_MAX_THREADS), + m_used_memory(0), m_used_thrds(0), m_shutdown(false) { + m_sched_thrd = std::thread(&FIFOScheduler::run, this); + m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this); + m_thrd_pool.resize(m_thrd_cnt); + } + + ~FIFOScheduler() { + if (!m_shutdown.load()) { + shutdown(); } - ~FIFOScheduler() { - if (!m_shutdown.load()) { - shutdown(); - } + m_sched_thrd.join(); + m_sched_wakeup_thrd.join(); + } - m_sched_thrd.join(); - m_sched_wakeup_thrd.join(); - } + void schedule_job(std::function job, size_t size, void *args, + size_t type = 0) { + std::unique_lock lk(m_cv_lock); + size_t ts = m_counter.fetch_add(1); - void schedule_job(std::function job, size_t size, void *args, size_t type=0) { - std::unique_lock lk(m_cv_lock); - size_t ts = m_counter.fetch_add(1); + m_stats.job_queued(ts, type, size); + m_task_queue.push(Task(size, ts, job, args, type, &m_stats)); - m_stats.job_queued(ts, type, size); - m_task_queue.push(Task(size, ts, job, args, type, &m_stats)); + m_cv.notify_all(); + } - m_cv.notify_all(); - } - - void shutdown() { - m_shutdown.store(true); - m_thrd_pool.stop(true); - m_cv.notify_all(); - } + void shutdown() { + m_shutdown.store(true); + m_thrd_pool.stop(true); + m_cv.notify_all(); + } - void print_statistics() { - m_stats.print_statistics(); - } + void print_statistics() { m_stats.print_statistics(); } private: - psudb::LockedPriorityQueue m_task_queue; + psudb::LockedPriorityQueue m_task_queue; - size_t m_memory_budget; - size_t m_thrd_cnt; + [[maybe_unused]] size_t m_memory_budget; + size_t m_thrd_cnt; - std::atomic m_shutdown; - std::atomic m_counter; - std::mutex m_cv_lock; - std::condition_variable m_cv; + std::atomic m_counter; + std::mutex m_cv_lock; + std::condition_variable m_cv; - std::thread m_sched_thrd; - std::thread m_sched_wakeup_thrd; - ctpl::thread_pool m_thrd_pool; + std::thread m_sched_thrd; + std::thread m_sched_wakeup_thrd; + ctpl::thread_pool m_thrd_pool; - std::atomic m_used_thrds; - std::atomic m_used_memory; + std::atomic m_used_memory; + std::atomic m_used_thrds; - SchedulerStatistics m_stats; + std::atomic m_shutdown; - void periodic_wakeup() { - do { - std::this_thread::sleep_for(10us); - m_cv.notify_all(); - } while (!m_shutdown.load()); - } + SchedulerStatistics m_stats; - void schedule_next() { - assert(m_task_queue.size() > 0); - auto t = m_task_queue.pop(); - m_stats.job_scheduled(t.m_timestamp); + void periodic_wakeup() { + do { + std::this_thread::sleep_for(10us); + m_cv.notify_all(); + } while (!m_shutdown.load()); + } - m_thrd_pool.push(t); - } + void schedule_next() { + assert(m_task_queue.size() > 0); + auto t = m_task_queue.pop(); + m_stats.job_scheduled(t.m_timestamp); - void run() { - do { - std::unique_lock cv_lock(m_cv_lock); - m_cv.wait(cv_lock); + m_thrd_pool.push(t); + } - while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) { - schedule_next(); - } - } while(!m_shutdown.load()); - } + void run() { + do { + std::unique_lock cv_lock(m_cv_lock); + m_cv.wait(cv_lock); + while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) { + schedule_next(); + } + } while (!m_shutdown.load()); + } }; -} +} // namespace de diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index ac59301..7cd9cfc 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -1,13 +1,13 @@ /* * include/framework/scheduling/SerialScheduler.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * * IMPORTANT: This "scheduler" is a shim implementation for allowing - * strictly serial, single-threaded operation of the framework. It should - * never be used in multi-threaded contexts. A call to the schedule_job + * strictly serial, single-threaded operation of the framework. It should + * never be used in multi-threaded contexts. A call to the schedule_job * function will immediately run the job and block on its completion before * returning. * @@ -21,42 +21,36 @@ namespace de { class SerialScheduler { public: - SerialScheduler(size_t memory_budget, size_t thread_cnt) - : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) - , m_used_memory(0) - , m_used_thrds(0) - , m_counter(0) - {} - - ~SerialScheduler() = default; - - void schedule_job(std::function job, size_t size, void *args, size_t type=0) { - size_t ts = m_counter++; - m_stats.job_queued(ts, type, size); - m_stats.job_scheduled(ts); - auto t = Task(size, ts, job, args, type, &m_stats); - t(0); - } - - void shutdown() { - /* intentionally left blank */ - } - - void print_statistics() { - m_stats.print_statistics(); - } + SerialScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX), + m_thrd_cnt((thread_cnt) ? thread_cnt : UINT64_MAX), m_used_memory(0), + m_used_thrds(0), m_counter(0) {} + + ~SerialScheduler() = default; + + void schedule_job(std::function job, size_t size, void *args, + size_t type = 0) { + size_t ts = m_counter++; + m_stats.job_queued(ts, type, size); + m_stats.job_scheduled(ts); + auto t = Task(size, ts, job, args, type, &m_stats); + t(0); + } + + void shutdown() { /* intentionally left blank */ } + + void print_statistics() { m_stats.print_statistics(); } private: - size_t m_memory_budget; - size_t m_thrd_cnt; + [[maybe_unused]] size_t m_memory_budget; + [[maybe_unused]] size_t m_thrd_cnt; - size_t m_used_thrds; - size_t m_used_memory; + [[maybe_unused]] size_t m_used_memory; + [[maybe_unused]] size_t m_used_thrds; - size_t m_counter; + size_t m_counter; - SchedulerStatistics m_stats; + SchedulerStatistics m_stats; }; -} +} // namespace de diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index bd53090..6b6f040 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -1,7 +1,7 @@ /* * include/framework/scheduling/Task.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -13,77 +13,76 @@ */ #pragma once -#include -#include #include +#include +#include -#include "framework/util/Configuration.h" #include "framework/scheduling/Epoch.h" #include "framework/scheduling/statistics.h" +#include "framework/util/Configuration.h" namespace de { -template S, QueryInterface Q, LayoutPolicy L> +template QueryType, + LayoutPolicy L> struct ReconstructionArgs { - Epoch *epoch; - ReconstructionVector merges; - std::promise result; - bool compaction; - void *extension; + typedef typename ShardType::RECORD RecordType; + Epoch *epoch; + ReconstructionVector merges; + std::promise result; + bool compaction; + void *extension; }; -template S, QueryInterface Q, LayoutPolicy L> -struct QueryArgs { - std::promise> result_set; - void *query_parms; - void *extension; +template Q, typename DE> struct QueryArgs { + std::promise> result_set; + typename Q::Parameters query_parms; + DE *extension; }; -typedef std::function Job; +typedef std::function Job; struct Task { - Task(size_t size, size_t ts, Job job, void *args, size_t type=0, SchedulerStatistics *stats=nullptr) - : m_job(job) - , m_size(size) - , m_timestamp(ts) - , m_args(args) - , m_type(type) - , m_stats(stats) - {} + Task(size_t size, size_t ts, Job job, void *args, size_t type = 0, + SchedulerStatistics *stats = nullptr) + : m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type), + m_stats(stats) {} - Job m_job; - size_t m_size; - size_t m_timestamp; - void *m_args; - size_t m_type; - SchedulerStatistics *m_stats; + Job m_job; + size_t m_size; + size_t m_timestamp; + void *m_args; + size_t m_type; + SchedulerStatistics *m_stats; - friend bool operator<(const Task &self, const Task &other) { - return self.m_timestamp < other.m_timestamp; - } + friend bool operator<(const Task &self, const Task &other) { + return self.m_timestamp < other.m_timestamp; + } - friend bool operator>(const Task &self, const Task &other) { - return self.m_timestamp > other.m_timestamp; - } + friend bool operator>(const Task &self, const Task &other) { + return self.m_timestamp > other.m_timestamp; + } - void operator()(size_t thrd_id) { - auto start = std::chrono::high_resolution_clock::now(); - if (m_stats) { - m_stats->job_begin(m_timestamp); - } + void operator()(size_t thrd_id) { + auto start = std::chrono::high_resolution_clock::now(); + if (m_stats) { + m_stats->job_begin(m_timestamp); + } - m_job(m_args); + m_job(m_args); - if (m_stats) { - m_stats->job_complete(m_timestamp); - } - auto stop = std::chrono::high_resolution_clock::now(); + if (m_stats) { + m_stats->job_complete(m_timestamp); + } + auto stop = std::chrono::high_resolution_clock::now(); - if (m_stats) { - auto time = std::chrono::duration_cast(stop - start).count(); - m_stats->log_time_data(time, m_type); - } + if (m_stats) { + auto time = + std::chrono::duration_cast(stop - start) + .count(); + m_stats->log_time_data(time, m_type); } + } }; -} +} // namespace de diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h index 6c479cd..48c186f 100644 --- a/include/framework/scheduling/statistics.h +++ b/include/framework/scheduling/statistics.h @@ -1,7 +1,7 @@ /* * include/framework/scheduling/statistics.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -13,106 +13,94 @@ */ #pragma once -#include +#include #include +#include +#include +#include #include #include -#include -#include -#include namespace de { class SchedulerStatistics { private: - enum class EventType { - QUEUED, - SCHEDULED, - STARTED, - FINISHED - }; + enum class EventType { QUEUED, SCHEDULED, STARTED, FINISHED }; - struct Event { - size_t id; - EventType type; - }; - - struct JobInfo { - size_t id; - size_t size; - size_t type; - }; + struct Event { + size_t id; + EventType type; + }; + struct JobInfo { + size_t id; + size_t size; + size_t type; + }; public: - SchedulerStatistics() = default; - ~SchedulerStatistics() = default; + SchedulerStatistics() = default; + ~SchedulerStatistics() = default; - void job_queued(size_t id, size_t type, size_t size) { - auto time = std::chrono::high_resolution_clock::now(); - } + void job_queued(size_t id, size_t type, size_t size) { } - void job_scheduled(size_t id) { - std::unique_lock lk(m_mutex); + void job_scheduled(size_t id) { std::unique_lock lk(m_mutex); } - } + void job_begin(size_t id) {} - void job_begin(size_t id) { + void job_complete(size_t id) {} - } + /* FIXME: This is just a temporary approach */ + void log_time_data(size_t length, size_t type) { + assert(type == 1 || type == 2); - void job_complete(size_t id) { + if (type == 1) { + m_type_1_cnt.fetch_add(1); + m_type_1_total_time.fetch_add(length); - } + if (length > m_type_1_largest_time) { + m_type_1_largest_time.store(length); + } + } else { + m_type_2_cnt.fetch_add(1); + m_type_2_total_time.fetch_add(length); - /* FIXME: This is just a temporary approach */ - void log_time_data(size_t length, size_t type) { - assert(type == 1 || type == 2); - - if (type == 1) { - m_type_1_cnt.fetch_add(1); - m_type_1_total_time.fetch_add(length); - - if (length > m_type_1_largest_time) { - m_type_1_largest_time.store(length); - } - } else { - m_type_2_cnt.fetch_add(1); - m_type_2_total_time.fetch_add(length); - - if (length > m_type_2_largest_time) { - m_type_2_largest_time.store(length); - } - } + if (length > m_type_2_largest_time) { + m_type_2_largest_time.store(length); + } } - - void print_statistics() { - if (m_type_1_cnt > 0) { - fprintf(stdout, "Query Count: %ld\tQuery Avg. Latency: %ld\tMax Query Latency: %ld\n", - m_type_1_cnt.load(), - m_type_1_total_time.load() / m_type_1_cnt.load(), - m_type_1_largest_time.load()); - } - if (m_type_2_cnt > 0) { - fprintf(stdout, "Reconstruction Count: %ld\tReconstruction Avg. Latency: %ld\tMax Recon. Latency:%ld\n", - m_type_2_cnt.load(), - m_type_2_total_time.load() / m_type_2_cnt.load(), - m_type_2_largest_time.load()); - } + } + + void print_statistics() { + if (m_type_1_cnt > 0) { + fprintf( + stdout, + "Query Count: %ld\tQuery Avg. Latency: %ld\tMax Query Latency: %ld\n", + m_type_1_cnt.load(), m_type_1_total_time.load() / m_type_1_cnt.load(), + m_type_1_largest_time.load()); + } + if (m_type_2_cnt > 0) { + fprintf(stdout, + "Reconstruction Count: %ld\tReconstruction Avg. Latency: " + "%ld\tMax Recon. Latency:%ld\n", + m_type_2_cnt.load(), + m_type_2_total_time.load() / m_type_2_cnt.load(), + m_type_2_largest_time.load()); } + } private: - std::mutex m_mutex; - std::unordered_map m_jobs; - std::vector m_event_log; + std::mutex m_mutex; + std::unordered_map m_jobs; + std::vector m_event_log; - std::atomic m_type_1_cnt; - std::atomic m_type_1_total_time; + std::atomic m_type_1_cnt; + std::atomic m_type_1_total_time; - std::atomic m_type_2_cnt; - std::atomic m_type_2_total_time; + std::atomic m_type_2_cnt; + std::atomic m_type_2_total_time; - std::atomic m_type_1_largest_time; - std::atomic m_type_2_largest_time; + std::atomic m_type_1_largest_time; + std::atomic m_type_2_largest_time; }; -} +} // namespace de -- cgit v1.2.3