diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-11-09 11:08:34 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-11-09 11:08:34 -0500 |
| commit | 39d22316be1708073e4fe1f708814cc801ecdc69 (patch) | |
| tree | 407467f24deac19f7d0c8b27f6d84785ee1e5b94 /include/framework/DynamicExtension.h | |
| parent | 357cab549c2ed33970562b84ff6f83923742343d (diff) | |
| download | dynamic-extension-39d22316be1708073e4fe1f708814cc801ecdc69.tar.gz | |
Fixed various concurrency bugs
1. The system should now cleanly shutdown when the DynamicExtension
object is destroyed. Before now, this would lead to use-after-frees
and/or deadlocks.
2. Improved synchronization on mutable buffer structure management to
fix the issue of the framework losing track of buffers during Epoch
changeovers.
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 77 |
1 files changed, 63 insertions, 14 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 7244856..a6047ea 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -62,6 +62,17 @@ public: } ~DynamicExtension() { + + /* let any in-flight epoch transition finish */ + await_next_epoch(); + + /* deactivate the active epoch */ + get_active_epoch()->set_inactive(); + + /* shutdown the scheduler */ + m_sched.shutdown(); + + /* delete all held resources */ for (auto e : m_epochs) { delete e.second; } @@ -125,7 +136,11 @@ public: } size_t get_height() { - return get_active_epoch()->get_structure()->get_height(); + auto epoch = get_active_epoch_protected(); + auto t = epoch->get_structure()->get_height(); + epoch->end_job(); + + return t; } size_t get_memory_usage() { @@ -211,7 +226,10 @@ public: * tombstone proportion invariant. */ bool validate_tombstone_proportion() { - return get_active_epoch()->get_structure()->validate_tombstone_proportion(); + auto epoch = get_active_epoch_protected(); + auto t = epoch->get_structure()->validate_tombstone_proportion(); + epoch->end_job(); + return t; } private: @@ -228,6 +246,8 @@ private: std::condition_variable m_epoch_cv; std::mutex m_epoch_cv_lk; + std::mutex m_epoch_transition_lk; + size_t m_scale_factor; double m_max_delete_prop; size_t m_buffer_capacity; @@ -252,6 +272,8 @@ private: } void advance_epoch() { + m_epoch_transition_lk.lock(); + size_t new_epoch_num = m_newest_epoch.load(); size_t old_epoch_num = m_current_epoch.load(); assert(new_epoch_num != old_epoch_num); @@ -267,18 +289,19 @@ private: */ if constexpr (!std::same_as<SCHED, SerialScheduler>) { size_t old_buffer_cnt = new_epoch->clear_buffers(); - // FIXME: this is getting nightmarish... The -1 here is to ensure that the - // the empty buffer added when the merge was first triggered is also included. - // Due to the reordering of operations in internal_append, the new buffer exists - // at the time of the clone, and so is already in the new epoch. - std::unique_lock<std::mutex> lk(m_struct_lock); - for (size_t i=old_buffer_cnt-1; i<old_epoch->get_buffers().size(); i++) { + + /* + * skip the first buffer, as this was the one that got merged, + * and copy all the other buffer references into the new epoch + */ + for (size_t i=1; i<old_epoch->get_buffers().size(); i++) { new_epoch->add_buffer(old_epoch->get_buffers()[i]); } } m_current_epoch.fetch_add(1); old_epoch->set_inactive(); + m_epoch_transition_lk.unlock(); /* notify any blocking threads that the new epoch is available */ m_epoch_cv_lk.lock(); @@ -310,16 +333,41 @@ private: } /* - * Add a new empty buffer to the specified epoch. This is intended to be used + * Add a new empty buffer. This is intended to be used * when a merge is triggered, to allow for inserts to be sustained in the new * buffer while a new epoch is being created in the background. Returns a * pointer to the newly created buffer. */ - Buffer *add_empty_buffer(_Epoch *epoch, Buffer *current_buffer=nullptr) { + Buffer *add_empty_buffer() { + /* + * if there's a current Epoch transition ongoing, a buffer installed + * into an older Epoch, but not the new one, may be lost. So fail to + * insert a buffer. + */ + if (!m_epoch_transition_lk.try_lock()) { + return nullptr; + } + + /* + * verify that the currently active buffer is still full, if + * not, there is no reason to add a new one. This code is + * protected by the epoch transition lock, so need need to + * take a protected reference to the epoch. + */ + auto active_epoch = get_active_epoch(); + if (!active_epoch->get_active_buffer()->is_full()) { + m_epoch_transition_lk.unlock(); + return nullptr; + } + + /* + * create a new buffer and install it in the active epoch. + */ auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); std::unique_lock<std::mutex> m_struct_lock; - auto new_buffer = epoch->add_buffer(temp_buffer, current_buffer); + auto new_buffer = active_epoch->add_buffer(temp_buffer); + /* * if epoch->add_buffer doesn't add the new buffer, this insert * won't update the buffer set (duplicate insert) @@ -330,6 +378,7 @@ private: if (new_buffer != temp_buffer) { delete temp_buffer; } + m_epoch_transition_lk.unlock(); return new_buffer; } @@ -503,15 +552,15 @@ private: * add an empty buffer to allow insert proceed and * schedule a merge on a background thread */ - buffer = add_empty_buffer(epoch); + buffer = add_empty_buffer(); schedule_merge(); } else { /* background merge is ongoing, so just add empty buffer */ - buffer = add_empty_buffer(epoch, buffer); + buffer = add_empty_buffer(); } } - res = buffer->append(rec, ts); + res = (buffer) ? buffer->append(rec, ts) : 0; epoch->end_job(); } while(!res); |