From 2ded45f5a20f38fdfd9f348c446c38dc713a5591 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 3 Mar 2025 13:41:19 -0500 Subject: Fixed a few concurrency bugs --- include/framework/scheduling/FIFOScheduler.h | 21 ++----------- include/framework/scheduling/LockManager.h | 35 +++++++++++++++++----- include/framework/scheduling/Task.h | 12 ++++++-- include/framework/scheduling/Version.h | 44 +++++++++++++++++++++------- 4 files changed, 74 insertions(+), 38 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 8a4cd8d..16fe111 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -1,7 +1,7 @@ /* * include/framework/scheduling/FIFOScheduler.h * - * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Copyright (C) 2023-2025 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -9,11 +9,6 @@ * determine which jobs to run next. If more jobs are scheduled than there * are available threads, the excess will stall until a thread becomes * available and then run in the order they were received by the scheduler. - * - * TODO: We need to set up a custom threadpool based on jthreads to support - * thread preemption for a later phase of this project. That will allow us - * to avoid blocking epoch transitions on long-running queries, or to pause - * reconstructions on demand. */ #pragma once @@ -40,7 +35,6 @@ public: m_thrd_cnt((thread_cnt) ? thread_cnt : DEFAULT_MAX_THREADS), m_used_memory(0), m_used_thrds(0), m_shutdown(false) { m_sched_thrd = std::thread(&FIFOScheduler::run, this); - m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this); m_thrd_pool.resize(m_thrd_cnt); } @@ -50,7 +44,6 @@ public: } m_sched_thrd.join(); - m_sched_wakeup_thrd.join(); m_flush_thread.join(); } @@ -60,13 +53,13 @@ public: size_t ts = m_counter.fetch_add(1); if (type == 3) { - do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock)); + do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock, &m_cv)); return; } std::unique_lock lk(m_cv_lock); m_stats.job_queued(ts, type, size); - m_task_queue.push(Task(size, ts, job, args, type, &m_stats)); + m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv)); m_cv.notify_all(); } @@ -95,7 +88,6 @@ private: std::thread m_flush_thread; std::thread m_sched_thrd; - std::thread m_sched_wakeup_thrd; ctpl::thread_pool m_thrd_pool; std::atomic m_used_memory; @@ -105,13 +97,6 @@ private: SchedulerStatistics m_stats; - void periodic_wakeup() { - do { - std::this_thread::sleep_for(10us); - m_cv.notify_all(); - } while (!m_shutdown.load()); - } - void schedule_next() { auto lk = std::unique_lock(m_queue_lock); assert(m_task_queue.size() > 0); diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h index fcc79d1..275c5ae 100644 --- a/include/framework/scheduling/LockManager.h +++ b/include/framework/scheduling/LockManager.h @@ -5,6 +5,7 @@ #pragma once #include #include +#include namespace de { class LockManager { @@ -13,6 +14,8 @@ public: for (size_t i=0; i < levels; i++) { m_lks.emplace_back(false); } + + m_last_unlocked_version = 0; } ~LockManager() = default; @@ -21,25 +24,38 @@ public: m_lks.emplace_back(false); } - void release_lock(size_t idx) { + void release_lock(size_t idx, size_t version) { if (idx < m_lks.size()) { - m_lks[idx].store(false); + assert(m_lks.at(idx).load() == true); + m_lks.at(idx).store(false); + + while (m_last_unlocked_version.load() < version) { + auto tmp = m_last_unlocked_version.load(); + m_last_unlocked_version.compare_exchange_strong(tmp, version); + } } } - bool is_locked(size_t idx) { + bool is_locked(size_t idx, size_t version) { if (idx < m_lks.size()) { - return m_lks[idx].load(); + return m_lks.at(idx).load() && m_last_unlocked_version <= version; } return false; } - bool take_lock(size_t idx) { + bool take_lock(size_t idx, size_t version) { if (idx < m_lks.size()) { - bool old = m_lks[idx].load(); + bool old = m_lks.at(idx).load(); if (!old) { - return m_lks[idx].compare_exchange_strong(old, true); + auto result = m_lks.at(idx).compare_exchange_strong(old, true); + + if (m_last_unlocked_version.load() > version) { + m_lks.at(idx).store(false); + return false; + } + + return result; } } @@ -55,6 +71,10 @@ public: return false; } + bool is_buffer_locked() { + return m_buffer_lk.load(); + } + void release_buffer_lock() { m_buffer_lk.store(false); } @@ -62,5 +82,6 @@ public: private: std::deque> m_lks; std::atomic m_buffer_lk; + std::atomic m_last_unlocked_version; }; } diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 2d68f56..3dbc9f4 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -1,7 +1,7 @@ /* * include/framework/scheduling/Task.h * - * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Copyright (C) 2023-2025 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -16,6 +16,7 @@ #include #include #include +#include #include "framework/scheduling/Version.h" #include "framework/scheduling/statistics.h" @@ -49,9 +50,9 @@ typedef std::function Job; struct Task { Task(size_t size, size_t ts, Job job, void *args, size_t type = 0, - SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr) + SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr, std::condition_variable *cv=nullptr) : m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type), - m_stats(stats), m_lk(lk) {} + m_stats(stats), m_lk(lk), m_cv(cv) {} Job m_job; size_t m_size; @@ -60,6 +61,7 @@ struct Task { size_t m_type; SchedulerStatistics *m_stats; std::mutex *m_lk; + std::condition_variable *m_cv; friend bool operator<(const Task &self, const Task &other) { return self.m_timestamp < other.m_timestamp; @@ -92,6 +94,10 @@ struct Task { if (m_lk) { m_lk->unlock(); } + + if (m_cv) { + m_cv->notify_all(); + } } }; diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index 4cd73ba..be54c84 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -25,16 +25,24 @@ private: typedef BufferView BufferViewType; public: + Version(size_t vid = 0) - : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0), - m_pending_buffer_head(-1) {} + : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0) {} Version(size_t number, std::unique_ptr structure, BufferType *buff, size_t head) : m_buffer(buff), m_structure(std::move(structure)), m_id(number), - m_buffer_head(head), m_pending_buffer_head(-1) {} - - ~Version() = default; + m_buffer_head(head) { + if (m_buffer) { + m_buffer->take_head_reference(m_buffer_head); + } + } + + ~Version() { + if (m_buffer) { + m_buffer->release_head_reference(m_buffer_head); + } + } /* * Versions are *not* copyable or movable. Only one can exist, and all users @@ -72,6 +80,9 @@ public: auto version = new Version(number); version->m_buffer = m_buffer; version->m_buffer_head = m_buffer_head; + if (version->m_buffer) { + version->m_buffer->take_head_reference(m_buffer_head); + } if (m_structure) { version->m_structure = std::unique_ptr(m_structure->copy()); @@ -81,8 +92,15 @@ public: } bool advance_buffer_head(size_t new_head) { - m_buffer_head = new_head; - return m_buffer->advance_head(new_head); + m_buffer->release_head_reference(m_buffer_head); + if (m_buffer->advance_head(new_head)) { + m_buffer_head = new_head; + return true; + } + + /* if we failed to advance, reclaim our reference */ + m_buffer->take_head_reference(m_buffer_head); + return false; } void update_shard_version(size_t version) { @@ -94,9 +112,16 @@ public: } - void set_head(size_t head) { - // fprintf(stderr, "[I] Updating buffer head of %ld to %ld\n", get_id(), head); + void set_buffer(BufferType *buffer, size_t head) { + assert(m_buffer == nullptr); + + m_buffer = buffer; m_buffer_head = head; + m_buffer->take_head_reference(head); + } + + bool valid() { + return (m_buffer) && (m_buffer_head) && (m_structure) && (m_id); } private: @@ -105,6 +130,5 @@ private: size_t m_id; size_t m_buffer_head; - ssize_t m_pending_buffer_head; }; } // namespace de -- cgit v1.2.3