summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-03-03 13:41:19 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-03-03 13:41:19 -0500
commit2ded45f5a20f38fdfd9f348c446c38dc713a5591 (patch)
tree746fb09b49ee4c00fc3e4760d899d60d8d8dcce0 /include/framework/scheduling
parentd116b94389538aa8e0e7354fae77693b980de4f0 (diff)
downloaddynamic-extension-2ded45f5a20f38fdfd9f348c446c38dc713a5591.tar.gz
Fixed a few concurrency bugs
Diffstat (limited to 'include/framework/scheduling')
-rw-r--r--include/framework/scheduling/FIFOScheduler.h21
-rw-r--r--include/framework/scheduling/LockManager.h35
-rw-r--r--include/framework/scheduling/Task.h12
-rw-r--r--include/framework/scheduling/Version.h44
4 files changed, 74 insertions, 38 deletions
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 8a4cd8d..16fe111 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -1,7 +1,7 @@
/*
* include/framework/scheduling/FIFOScheduler.h
*
- * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -9,11 +9,6 @@
* determine which jobs to run next. If more jobs are scheduled than there
* are available threads, the excess will stall until a thread becomes
* available and then run in the order they were received by the scheduler.
- *
- * TODO: We need to set up a custom threadpool based on jthreads to support
- * thread preemption for a later phase of this project. That will allow us
- * to avoid blocking epoch transitions on long-running queries, or to pause
- * reconstructions on demand.
*/
#pragma once
@@ -40,7 +35,6 @@ public:
m_thrd_cnt((thread_cnt) ? thread_cnt : DEFAULT_MAX_THREADS),
m_used_memory(0), m_used_thrds(0), m_shutdown(false) {
m_sched_thrd = std::thread(&FIFOScheduler::run, this);
- m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this);
m_thrd_pool.resize(m_thrd_cnt);
}
@@ -50,7 +44,6 @@ public:
}
m_sched_thrd.join();
- m_sched_wakeup_thrd.join();
m_flush_thread.join();
}
@@ -60,13 +53,13 @@ public:
size_t ts = m_counter.fetch_add(1);
if (type == 3) {
- do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock));
+ do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock, &m_cv));
return;
}
std::unique_lock<std::mutex> lk(m_cv_lock);
m_stats.job_queued(ts, type, size);
- m_task_queue.push(Task(size, ts, job, args, type, &m_stats));
+ m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv));
m_cv.notify_all();
}
@@ -95,7 +88,6 @@ private:
std::thread m_flush_thread;
std::thread m_sched_thrd;
- std::thread m_sched_wakeup_thrd;
ctpl::thread_pool m_thrd_pool;
std::atomic<size_t> m_used_memory;
@@ -105,13 +97,6 @@ private:
SchedulerStatistics m_stats;
- void periodic_wakeup() {
- do {
- std::this_thread::sleep_for(10us);
- m_cv.notify_all();
- } while (!m_shutdown.load());
- }
-
void schedule_next() {
auto lk = std::unique_lock<std::mutex>(m_queue_lock);
assert(m_task_queue.size() > 0);
diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h
index fcc79d1..275c5ae 100644
--- a/include/framework/scheduling/LockManager.h
+++ b/include/framework/scheduling/LockManager.h
@@ -5,6 +5,7 @@
#pragma once
#include <deque>
#include <atomic>
+#include <cassert>
namespace de {
class LockManager {
@@ -13,6 +14,8 @@ public:
for (size_t i=0; i < levels; i++) {
m_lks.emplace_back(false);
}
+
+ m_last_unlocked_version = 0;
}
~LockManager() = default;
@@ -21,25 +24,38 @@ public:
m_lks.emplace_back(false);
}
- void release_lock(size_t idx) {
+ void release_lock(size_t idx, size_t version) {
if (idx < m_lks.size()) {
- m_lks[idx].store(false);
+ assert(m_lks.at(idx).load() == true);
+ m_lks.at(idx).store(false);
+
+ while (m_last_unlocked_version.load() < version) {
+ auto tmp = m_last_unlocked_version.load();
+ m_last_unlocked_version.compare_exchange_strong(tmp, version);
+ }
}
}
- bool is_locked(size_t idx) {
+ bool is_locked(size_t idx, size_t version) {
if (idx < m_lks.size()) {
- return m_lks[idx].load();
+ return m_lks.at(idx).load() && m_last_unlocked_version <= version;
}
return false;
}
- bool take_lock(size_t idx) {
+ bool take_lock(size_t idx, size_t version) {
if (idx < m_lks.size()) {
- bool old = m_lks[idx].load();
+ bool old = m_lks.at(idx).load();
if (!old) {
- return m_lks[idx].compare_exchange_strong(old, true);
+ auto result = m_lks.at(idx).compare_exchange_strong(old, true);
+
+ if (m_last_unlocked_version.load() > version) {
+ m_lks.at(idx).store(false);
+ return false;
+ }
+
+ return result;
}
}
@@ -55,6 +71,10 @@ public:
return false;
}
+ bool is_buffer_locked() {
+ return m_buffer_lk.load();
+ }
+
void release_buffer_lock() {
m_buffer_lk.store(false);
}
@@ -62,5 +82,6 @@ public:
private:
std::deque<std::atomic<bool>> m_lks;
std::atomic<bool> m_buffer_lk;
+ std::atomic<size_t> m_last_unlocked_version;
};
}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 2d68f56..3dbc9f4 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -1,7 +1,7 @@
/*
* include/framework/scheduling/Task.h
*
- * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -16,6 +16,7 @@
#include <chrono>
#include <functional>
#include <future>
+#include <condition_variable>
#include "framework/scheduling/Version.h"
#include "framework/scheduling/statistics.h"
@@ -49,9 +50,9 @@ typedef std::function<void(void *)> Job;
struct Task {
Task(size_t size, size_t ts, Job job, void *args, size_t type = 0,
- SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr)
+ SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr, std::condition_variable *cv=nullptr)
: m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type),
- m_stats(stats), m_lk(lk) {}
+ m_stats(stats), m_lk(lk), m_cv(cv) {}
Job m_job;
size_t m_size;
@@ -60,6 +61,7 @@ struct Task {
size_t m_type;
SchedulerStatistics *m_stats;
std::mutex *m_lk;
+ std::condition_variable *m_cv;
friend bool operator<(const Task &self, const Task &other) {
return self.m_timestamp < other.m_timestamp;
@@ -92,6 +94,10 @@ struct Task {
if (m_lk) {
m_lk->unlock();
}
+
+ if (m_cv) {
+ m_cv->notify_all();
+ }
}
};
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h
index 4cd73ba..be54c84 100644
--- a/include/framework/scheduling/Version.h
+++ b/include/framework/scheduling/Version.h
@@ -25,16 +25,24 @@ private:
typedef BufferView<RecordType> BufferViewType;
public:
+
Version(size_t vid = 0)
- : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0),
- m_pending_buffer_head(-1) {}
+ : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0) {}
Version(size_t number, std::unique_ptr<StructureType> structure, BufferType *buff,
size_t head)
: m_buffer(buff), m_structure(std::move(structure)), m_id(number),
- m_buffer_head(head), m_pending_buffer_head(-1) {}
-
- ~Version() = default;
+ m_buffer_head(head) {
+ if (m_buffer) {
+ m_buffer->take_head_reference(m_buffer_head);
+ }
+ }
+
+ ~Version() {
+ if (m_buffer) {
+ m_buffer->release_head_reference(m_buffer_head);
+ }
+ }
/*
* Versions are *not* copyable or movable. Only one can exist, and all users
@@ -72,6 +80,9 @@ public:
auto version = new Version(number);
version->m_buffer = m_buffer;
version->m_buffer_head = m_buffer_head;
+ if (version->m_buffer) {
+ version->m_buffer->take_head_reference(m_buffer_head);
+ }
if (m_structure) {
version->m_structure = std::unique_ptr(m_structure->copy());
@@ -81,8 +92,15 @@ public:
}
bool advance_buffer_head(size_t new_head) {
- m_buffer_head = new_head;
- return m_buffer->advance_head(new_head);
+ m_buffer->release_head_reference(m_buffer_head);
+ if (m_buffer->advance_head(new_head)) {
+ m_buffer_head = new_head;
+ return true;
+ }
+
+ /* if we failed to advance, reclaim our reference */
+ m_buffer->take_head_reference(m_buffer_head);
+ return false;
}
void update_shard_version(size_t version) {
@@ -94,9 +112,16 @@ public:
}
- void set_head(size_t head) {
- // fprintf(stderr, "[I] Updating buffer head of %ld to %ld\n", get_id(), head);
+ void set_buffer(BufferType *buffer, size_t head) {
+ assert(m_buffer == nullptr);
+
+ m_buffer = buffer;
m_buffer_head = head;
+ m_buffer->take_head_reference(head);
+ }
+
+ bool valid() {
+ return (m_buffer) && (m_buffer_head) && (m_structure) && (m_id);
}
private:
@@ -105,6 +130,5 @@ private:
size_t m_id;
size_t m_buffer_head;
- ssize_t m_pending_buffer_head;
};
} // namespace de