summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling/Epoch.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-11-09 11:08:34 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2023-11-09 11:08:34 -0500
commit39d22316be1708073e4fe1f708814cc801ecdc69 (patch)
tree407467f24deac19f7d0c8b27f6d84785ee1e5b94 /include/framework/scheduling/Epoch.h
parent357cab549c2ed33970562b84ff6f83923742343d (diff)
downloaddynamic-extension-39d22316be1708073e4fe1f708814cc801ecdc69.tar.gz
Fixed various concurrency bugs
1. The system should now cleanly shutdown when the DynamicExtension object is destroyed. Before now, this would lead to use-after-frees and/or deadlocks. 2. Improved synchronization on mutable buffer structure management to fix the issue of the framework losing track of buffers during Epoch changeovers.
Diffstat (limited to 'include/framework/scheduling/Epoch.h')
-rw-r--r--include/framework/scheduling/Epoch.h21
1 files changed, 16 insertions, 5 deletions
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index 9193b06..fc08d57 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -47,9 +47,14 @@ public:
~Epoch() {
assert(m_active_jobs.load() == 0);
- for (auto buf : m_buffers) {
- buf->release_reference();
- }
+ /* FIXME: this is needed to keep the destructor from
+ * sometimes locking up here. But there *shouldn't* be
+ * any threads waiting on this signal at object destruction,
+ * so something else is going on here that needs looked into
+ */
+ //m_active_cv.notify_all();
+
+ clear_buffers();
if (m_structure) {
m_structure->release_reference();
@@ -59,6 +64,7 @@ public:
Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) {
assert(buf);
+ std::unique_lock<std::mutex> m_buffer_lock;
/*
* if a current buffer is specified, only add the
* new buffer if the active buffer is the current,
@@ -108,6 +114,7 @@ public:
}
BufView get_buffer_view() {
+ std::unique_lock<std::mutex> m_buffer_lock;
return BufView(m_buffers);
}
@@ -123,6 +130,7 @@ public:
* releasing all references in the process.
*/
size_t clear_buffers() {
+ std::unique_lock<std::mutex> m_buffer_lock;
size_t buf_cnt = m_buffers.size();
for (auto buf : m_buffers) {
if (buf) buf->release_reference();
@@ -138,6 +146,7 @@ public:
* the new epoch will be set to the provided argument.
*/
Epoch *clone(size_t number) {
+ std::unique_lock<std::mutex> m_buffer_lock;
auto epoch = new Epoch(number);
epoch->m_buffers = m_buffers;
if (m_structure) {
@@ -196,8 +205,8 @@ public:
* wait for them to finish and return true. If there are
* not active jobs, return true immediately
*/
- while (m_active_jobs > 0) {
- std::unique_lock<std::mutex> lk(m_cv_lock);
+ std::unique_lock<std::mutex> lk(m_cv_lock);
+ while (m_active_jobs.load() > 0) {
m_active_cv.wait(lk);
}
@@ -211,6 +220,8 @@ private:
std::condition_variable m_active_cv;
std::mutex m_cv_lock;
+ std::mutex m_buffer_lock;
+
std::atomic<bool> m_active_merge;
/*