summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-11-09 11:08:34 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2023-11-09 11:08:34 -0500
commit39d22316be1708073e4fe1f708814cc801ecdc69 (patch)
tree407467f24deac19f7d0c8b27f6d84785ee1e5b94 /include
parent357cab549c2ed33970562b84ff6f83923742343d (diff)
downloaddynamic-extension-39d22316be1708073e4fe1f708814cc801ecdc69.tar.gz
Fixed various concurrency bugs
1. The system should now cleanly shutdown when the DynamicExtension object is destroyed. Before now, this would lead to use-after-frees and/or deadlocks. 2. Improved synchronization on mutable buffer structure management to fix the issue of the framework losing track of buffers during Epoch changeovers.
Diffstat (limited to 'include')
-rw-r--r--include/framework/DynamicExtension.h77
-rw-r--r--include/framework/scheduling/Epoch.h21
-rw-r--r--include/framework/scheduling/FIFOScheduler.h7
3 files changed, 84 insertions, 21 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 7244856..a6047ea 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -62,6 +62,17 @@ public:
}
~DynamicExtension() {
+
+ /* let any in-flight epoch transition finish */
+ await_next_epoch();
+
+ /* deactivate the active epoch */
+ get_active_epoch()->set_inactive();
+
+ /* shutdown the scheduler */
+ m_sched.shutdown();
+
+ /* delete all held resources */
for (auto e : m_epochs) {
delete e.second;
}
@@ -125,7 +136,11 @@ public:
}
size_t get_height() {
- return get_active_epoch()->get_structure()->get_height();
+ auto epoch = get_active_epoch_protected();
+ auto t = epoch->get_structure()->get_height();
+ epoch->end_job();
+
+ return t;
}
size_t get_memory_usage() {
@@ -211,7 +226,10 @@ public:
* tombstone proportion invariant.
*/
bool validate_tombstone_proportion() {
- return get_active_epoch()->get_structure()->validate_tombstone_proportion();
+ auto epoch = get_active_epoch_protected();
+ auto t = epoch->get_structure()->validate_tombstone_proportion();
+ epoch->end_job();
+ return t;
}
private:
@@ -228,6 +246,8 @@ private:
std::condition_variable m_epoch_cv;
std::mutex m_epoch_cv_lk;
+ std::mutex m_epoch_transition_lk;
+
size_t m_scale_factor;
double m_max_delete_prop;
size_t m_buffer_capacity;
@@ -252,6 +272,8 @@ private:
}
void advance_epoch() {
+ m_epoch_transition_lk.lock();
+
size_t new_epoch_num = m_newest_epoch.load();
size_t old_epoch_num = m_current_epoch.load();
assert(new_epoch_num != old_epoch_num);
@@ -267,18 +289,19 @@ private:
*/
if constexpr (!std::same_as<SCHED, SerialScheduler>) {
size_t old_buffer_cnt = new_epoch->clear_buffers();
- // FIXME: this is getting nightmarish... The -1 here is to ensure that the
- // the empty buffer added when the merge was first triggered is also included.
- // Due to the reordering of operations in internal_append, the new buffer exists
- // at the time of the clone, and so is already in the new epoch.
- std::unique_lock<std::mutex> lk(m_struct_lock);
- for (size_t i=old_buffer_cnt-1; i<old_epoch->get_buffers().size(); i++) {
+
+ /*
+ * skip the first buffer, as this was the one that got merged,
+ * and copy all the other buffer references into the new epoch
+ */
+ for (size_t i=1; i<old_epoch->get_buffers().size(); i++) {
new_epoch->add_buffer(old_epoch->get_buffers()[i]);
}
}
m_current_epoch.fetch_add(1);
old_epoch->set_inactive();
+ m_epoch_transition_lk.unlock();
/* notify any blocking threads that the new epoch is available */
m_epoch_cv_lk.lock();
@@ -310,16 +333,41 @@ private:
}
/*
- * Add a new empty buffer to the specified epoch. This is intended to be used
+ * Add a new empty buffer. This is intended to be used
* when a merge is triggered, to allow for inserts to be sustained in the new
* buffer while a new epoch is being created in the background. Returns a
* pointer to the newly created buffer.
*/
- Buffer *add_empty_buffer(_Epoch *epoch, Buffer *current_buffer=nullptr) {
+ Buffer *add_empty_buffer() {
+ /*
+ * if there's a current Epoch transition ongoing, a buffer installed
+ * into an older Epoch, but not the new one, may be lost. So fail to
+ * insert a buffer.
+ */
+ if (!m_epoch_transition_lk.try_lock()) {
+ return nullptr;
+ }
+
+ /*
+ * verify that the currently active buffer is still full, if
+ * not, there is no reason to add a new one. This code is
+ * protected by the epoch transition lock, so need need to
+ * take a protected reference to the epoch.
+ */
+ auto active_epoch = get_active_epoch();
+ if (!active_epoch->get_active_buffer()->is_full()) {
+ m_epoch_transition_lk.unlock();
+ return nullptr;
+ }
+
+ /*
+ * create a new buffer and install it in the active epoch.
+ */
auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
std::unique_lock<std::mutex> m_struct_lock;
- auto new_buffer = epoch->add_buffer(temp_buffer, current_buffer);
+ auto new_buffer = active_epoch->add_buffer(temp_buffer);
+
/*
* if epoch->add_buffer doesn't add the new buffer, this insert
* won't update the buffer set (duplicate insert)
@@ -330,6 +378,7 @@ private:
if (new_buffer != temp_buffer) {
delete temp_buffer;
}
+ m_epoch_transition_lk.unlock();
return new_buffer;
}
@@ -503,15 +552,15 @@ private:
* add an empty buffer to allow insert proceed and
* schedule a merge on a background thread
*/
- buffer = add_empty_buffer(epoch);
+ buffer = add_empty_buffer();
schedule_merge();
} else {
/* background merge is ongoing, so just add empty buffer */
- buffer = add_empty_buffer(epoch, buffer);
+ buffer = add_empty_buffer();
}
}
- res = buffer->append(rec, ts);
+ res = (buffer) ? buffer->append(rec, ts) : 0;
epoch->end_job();
} while(!res);
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index 9193b06..fc08d57 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -47,9 +47,14 @@ public:
~Epoch() {
assert(m_active_jobs.load() == 0);
- for (auto buf : m_buffers) {
- buf->release_reference();
- }
+ /* FIXME: this is needed to keep the destructor from
+ * sometimes locking up here. But there *shouldn't* be
+ * any threads waiting on this signal at object destruction,
+ * so something else is going on here that needs looked into
+ */
+ //m_active_cv.notify_all();
+
+ clear_buffers();
if (m_structure) {
m_structure->release_reference();
@@ -59,6 +64,7 @@ public:
Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) {
assert(buf);
+ std::unique_lock<std::mutex> m_buffer_lock;
/*
* if a current buffer is specified, only add the
* new buffer if the active buffer is the current,
@@ -108,6 +114,7 @@ public:
}
BufView get_buffer_view() {
+ std::unique_lock<std::mutex> m_buffer_lock;
return BufView(m_buffers);
}
@@ -123,6 +130,7 @@ public:
* releasing all references in the process.
*/
size_t clear_buffers() {
+ std::unique_lock<std::mutex> m_buffer_lock;
size_t buf_cnt = m_buffers.size();
for (auto buf : m_buffers) {
if (buf) buf->release_reference();
@@ -138,6 +146,7 @@ public:
* the new epoch will be set to the provided argument.
*/
Epoch *clone(size_t number) {
+ std::unique_lock<std::mutex> m_buffer_lock;
auto epoch = new Epoch(number);
epoch->m_buffers = m_buffers;
if (m_structure) {
@@ -196,8 +205,8 @@ public:
* wait for them to finish and return true. If there are
* not active jobs, return true immediately
*/
- while (m_active_jobs > 0) {
- std::unique_lock<std::mutex> lk(m_cv_lock);
+ std::unique_lock<std::mutex> lk(m_cv_lock);
+ while (m_active_jobs.load() > 0) {
m_active_cv.wait(lk);
}
@@ -211,6 +220,8 @@ private:
std::condition_variable m_active_cv;
std::mutex m_cv_lock;
+ std::mutex m_buffer_lock;
+
std::atomic<bool> m_active_merge;
/*
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index ba62f9e..4cdc436 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -47,9 +47,10 @@ public:
}
~FIFOScheduler() {
- shutdown();
+ if (!m_shutdown.load()) {
+ shutdown();
+ }
- m_cv.notify_all();
m_sched_thrd.join();
}
@@ -63,6 +64,8 @@ public:
void shutdown() {
m_shutdown.store(true);
+ m_thrd_pool.stop(true);
+ m_cv.notify_all();
}
private: