summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-11-09 11:08:34 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2023-11-09 11:08:34 -0500
commit39d22316be1708073e4fe1f708814cc801ecdc69 (patch)
tree407467f24deac19f7d0c8b27f6d84785ee1e5b94 /include/framework/DynamicExtension.h
parent357cab549c2ed33970562b84ff6f83923742343d (diff)
downloaddynamic-extension-39d22316be1708073e4fe1f708814cc801ecdc69.tar.gz
Fixed various concurrency bugs
1. The system should now cleanly shutdown when the DynamicExtension object is destroyed. Before now, this would lead to use-after-frees and/or deadlocks. 2. Improved synchronization on mutable buffer structure management to fix the issue of the framework losing track of buffers during Epoch changeovers.
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h77
1 files changed, 63 insertions, 14 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 7244856..a6047ea 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -62,6 +62,17 @@ public:
}
~DynamicExtension() {
+
+ /* let any in-flight epoch transition finish */
+ await_next_epoch();
+
+ /* deactivate the active epoch */
+ get_active_epoch()->set_inactive();
+
+ /* shutdown the scheduler */
+ m_sched.shutdown();
+
+ /* delete all held resources */
for (auto e : m_epochs) {
delete e.second;
}
@@ -125,7 +136,11 @@ public:
}
size_t get_height() {
- return get_active_epoch()->get_structure()->get_height();
+ auto epoch = get_active_epoch_protected();
+ auto t = epoch->get_structure()->get_height();
+ epoch->end_job();
+
+ return t;
}
size_t get_memory_usage() {
@@ -211,7 +226,10 @@ public:
* tombstone proportion invariant.
*/
bool validate_tombstone_proportion() {
- return get_active_epoch()->get_structure()->validate_tombstone_proportion();
+ auto epoch = get_active_epoch_protected();
+ auto t = epoch->get_structure()->validate_tombstone_proportion();
+ epoch->end_job();
+ return t;
}
private:
@@ -228,6 +246,8 @@ private:
std::condition_variable m_epoch_cv;
std::mutex m_epoch_cv_lk;
+ std::mutex m_epoch_transition_lk;
+
size_t m_scale_factor;
double m_max_delete_prop;
size_t m_buffer_capacity;
@@ -252,6 +272,8 @@ private:
}
void advance_epoch() {
+ m_epoch_transition_lk.lock();
+
size_t new_epoch_num = m_newest_epoch.load();
size_t old_epoch_num = m_current_epoch.load();
assert(new_epoch_num != old_epoch_num);
@@ -267,18 +289,19 @@ private:
*/
if constexpr (!std::same_as<SCHED, SerialScheduler>) {
size_t old_buffer_cnt = new_epoch->clear_buffers();
- // FIXME: this is getting nightmarish... The -1 here is to ensure that the
- // the empty buffer added when the merge was first triggered is also included.
- // Due to the reordering of operations in internal_append, the new buffer exists
- // at the time of the clone, and so is already in the new epoch.
- std::unique_lock<std::mutex> lk(m_struct_lock);
- for (size_t i=old_buffer_cnt-1; i<old_epoch->get_buffers().size(); i++) {
+
+ /*
+ * skip the first buffer, as this was the one that got merged,
+ * and copy all the other buffer references into the new epoch
+ */
+ for (size_t i=1; i<old_epoch->get_buffers().size(); i++) {
new_epoch->add_buffer(old_epoch->get_buffers()[i]);
}
}
m_current_epoch.fetch_add(1);
old_epoch->set_inactive();
+ m_epoch_transition_lk.unlock();
/* notify any blocking threads that the new epoch is available */
m_epoch_cv_lk.lock();
@@ -310,16 +333,41 @@ private:
}
/*
- * Add a new empty buffer to the specified epoch. This is intended to be used
+ * Add a new empty buffer. This is intended to be used
* when a merge is triggered, to allow for inserts to be sustained in the new
* buffer while a new epoch is being created in the background. Returns a
* pointer to the newly created buffer.
*/
- Buffer *add_empty_buffer(_Epoch *epoch, Buffer *current_buffer=nullptr) {
+ Buffer *add_empty_buffer() {
+ /*
+ * if there's a current Epoch transition ongoing, a buffer installed
+ * into an older Epoch, but not the new one, may be lost. So fail to
+ * insert a buffer.
+ */
+ if (!m_epoch_transition_lk.try_lock()) {
+ return nullptr;
+ }
+
+ /*
+ * verify that the currently active buffer is still full, if
+ * not, there is no reason to add a new one. This code is
+ * protected by the epoch transition lock, so need need to
+ * take a protected reference to the epoch.
+ */
+ auto active_epoch = get_active_epoch();
+ if (!active_epoch->get_active_buffer()->is_full()) {
+ m_epoch_transition_lk.unlock();
+ return nullptr;
+ }
+
+ /*
+ * create a new buffer and install it in the active epoch.
+ */
auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
std::unique_lock<std::mutex> m_struct_lock;
- auto new_buffer = epoch->add_buffer(temp_buffer, current_buffer);
+ auto new_buffer = active_epoch->add_buffer(temp_buffer);
+
/*
* if epoch->add_buffer doesn't add the new buffer, this insert
* won't update the buffer set (duplicate insert)
@@ -330,6 +378,7 @@ private:
if (new_buffer != temp_buffer) {
delete temp_buffer;
}
+ m_epoch_transition_lk.unlock();
return new_buffer;
}
@@ -503,15 +552,15 @@ private:
* add an empty buffer to allow insert proceed and
* schedule a merge on a background thread
*/
- buffer = add_empty_buffer(epoch);
+ buffer = add_empty_buffer();
schedule_merge();
} else {
/* background merge is ongoing, so just add empty buffer */
- buffer = add_empty_buffer(epoch, buffer);
+ buffer = add_empty_buffer();
}
}
- res = buffer->append(rec, ts);
+ res = (buffer) ? buffer->append(rec, ts) : 0;
epoch->end_job();
} while(!res);