From 39d22316be1708073e4fe1f708814cc801ecdc69 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 9 Nov 2023 11:08:34 -0500 Subject: Fixed various concurrency bugs 1. The system should now cleanly shutdown when the DynamicExtension object is destroyed. Before now, this would lead to use-after-frees and/or deadlocks. 2. Improved synchronization on mutable buffer structure management to fix the issue of the framework losing track of buffers during Epoch changeovers. --- include/framework/DynamicExtension.h | 77 +++++++++++++++++++++++----- include/framework/scheduling/Epoch.h | 21 ++++++-- include/framework/scheduling/FIFOScheduler.h | 7 ++- 3 files changed, 84 insertions(+), 21 deletions(-) (limited to 'include/framework') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 7244856..a6047ea 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -62,6 +62,17 @@ public: } ~DynamicExtension() { + + /* let any in-flight epoch transition finish */ + await_next_epoch(); + + /* deactivate the active epoch */ + get_active_epoch()->set_inactive(); + + /* shutdown the scheduler */ + m_sched.shutdown(); + + /* delete all held resources */ for (auto e : m_epochs) { delete e.second; } @@ -125,7 +136,11 @@ public: } size_t get_height() { - return get_active_epoch()->get_structure()->get_height(); + auto epoch = get_active_epoch_protected(); + auto t = epoch->get_structure()->get_height(); + epoch->end_job(); + + return t; } size_t get_memory_usage() { @@ -211,7 +226,10 @@ public: * tombstone proportion invariant. */ bool validate_tombstone_proportion() { - return get_active_epoch()->get_structure()->validate_tombstone_proportion(); + auto epoch = get_active_epoch_protected(); + auto t = epoch->get_structure()->validate_tombstone_proportion(); + epoch->end_job(); + return t; } private: @@ -228,6 +246,8 @@ private: std::condition_variable m_epoch_cv; std::mutex m_epoch_cv_lk; + std::mutex m_epoch_transition_lk; + size_t m_scale_factor; double m_max_delete_prop; size_t m_buffer_capacity; @@ -252,6 +272,8 @@ private: } void advance_epoch() { + m_epoch_transition_lk.lock(); + size_t new_epoch_num = m_newest_epoch.load(); size_t old_epoch_num = m_current_epoch.load(); assert(new_epoch_num != old_epoch_num); @@ -267,18 +289,19 @@ private: */ if constexpr (!std::same_as) { size_t old_buffer_cnt = new_epoch->clear_buffers(); - // FIXME: this is getting nightmarish... The -1 here is to ensure that the - // the empty buffer added when the merge was first triggered is also included. - // Due to the reordering of operations in internal_append, the new buffer exists - // at the time of the clone, and so is already in the new epoch. - std::unique_lock lk(m_struct_lock); - for (size_t i=old_buffer_cnt-1; iget_buffers().size(); i++) { + + /* + * skip the first buffer, as this was the one that got merged, + * and copy all the other buffer references into the new epoch + */ + for (size_t i=1; iget_buffers().size(); i++) { new_epoch->add_buffer(old_epoch->get_buffers()[i]); } } m_current_epoch.fetch_add(1); old_epoch->set_inactive(); + m_epoch_transition_lk.unlock(); /* notify any blocking threads that the new epoch is available */ m_epoch_cv_lk.lock(); @@ -310,16 +333,41 @@ private: } /* - * Add a new empty buffer to the specified epoch. This is intended to be used + * Add a new empty buffer. This is intended to be used * when a merge is triggered, to allow for inserts to be sustained in the new * buffer while a new epoch is being created in the background. Returns a * pointer to the newly created buffer. */ - Buffer *add_empty_buffer(_Epoch *epoch, Buffer *current_buffer=nullptr) { + Buffer *add_empty_buffer() { + /* + * if there's a current Epoch transition ongoing, a buffer installed + * into an older Epoch, but not the new one, may be lost. So fail to + * insert a buffer. + */ + if (!m_epoch_transition_lk.try_lock()) { + return nullptr; + } + + /* + * verify that the currently active buffer is still full, if + * not, there is no reason to add a new one. This code is + * protected by the epoch transition lock, so need need to + * take a protected reference to the epoch. + */ + auto active_epoch = get_active_epoch(); + if (!active_epoch->get_active_buffer()->is_full()) { + m_epoch_transition_lk.unlock(); + return nullptr; + } + + /* + * create a new buffer and install it in the active epoch. + */ auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); std::unique_lock m_struct_lock; - auto new_buffer = epoch->add_buffer(temp_buffer, current_buffer); + auto new_buffer = active_epoch->add_buffer(temp_buffer); + /* * if epoch->add_buffer doesn't add the new buffer, this insert * won't update the buffer set (duplicate insert) @@ -330,6 +378,7 @@ private: if (new_buffer != temp_buffer) { delete temp_buffer; } + m_epoch_transition_lk.unlock(); return new_buffer; } @@ -503,15 +552,15 @@ private: * add an empty buffer to allow insert proceed and * schedule a merge on a background thread */ - buffer = add_empty_buffer(epoch); + buffer = add_empty_buffer(); schedule_merge(); } else { /* background merge is ongoing, so just add empty buffer */ - buffer = add_empty_buffer(epoch, buffer); + buffer = add_empty_buffer(); } } - res = buffer->append(rec, ts); + res = (buffer) ? buffer->append(rec, ts) : 0; epoch->end_job(); } while(!res); diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 9193b06..fc08d57 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -47,9 +47,14 @@ public: ~Epoch() { assert(m_active_jobs.load() == 0); - for (auto buf : m_buffers) { - buf->release_reference(); - } + /* FIXME: this is needed to keep the destructor from + * sometimes locking up here. But there *shouldn't* be + * any threads waiting on this signal at object destruction, + * so something else is going on here that needs looked into + */ + //m_active_cv.notify_all(); + + clear_buffers(); if (m_structure) { m_structure->release_reference(); @@ -59,6 +64,7 @@ public: Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) { assert(buf); + std::unique_lock m_buffer_lock; /* * if a current buffer is specified, only add the * new buffer if the active buffer is the current, @@ -108,6 +114,7 @@ public: } BufView get_buffer_view() { + std::unique_lock m_buffer_lock; return BufView(m_buffers); } @@ -123,6 +130,7 @@ public: * releasing all references in the process. */ size_t clear_buffers() { + std::unique_lock m_buffer_lock; size_t buf_cnt = m_buffers.size(); for (auto buf : m_buffers) { if (buf) buf->release_reference(); @@ -138,6 +146,7 @@ public: * the new epoch will be set to the provided argument. */ Epoch *clone(size_t number) { + std::unique_lock m_buffer_lock; auto epoch = new Epoch(number); epoch->m_buffers = m_buffers; if (m_structure) { @@ -196,8 +205,8 @@ public: * wait for them to finish and return true. If there are * not active jobs, return true immediately */ - while (m_active_jobs > 0) { - std::unique_lock lk(m_cv_lock); + std::unique_lock lk(m_cv_lock); + while (m_active_jobs.load() > 0) { m_active_cv.wait(lk); } @@ -211,6 +220,8 @@ private: std::condition_variable m_active_cv; std::mutex m_cv_lock; + std::mutex m_buffer_lock; + std::atomic m_active_merge; /* diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index ba62f9e..4cdc436 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -47,9 +47,10 @@ public: } ~FIFOScheduler() { - shutdown(); + if (!m_shutdown.load()) { + shutdown(); + } - m_cv.notify_all(); m_sched_thrd.join(); } @@ -63,6 +64,8 @@ public: void shutdown() { m_shutdown.store(true); + m_thrd_pool.stop(true); + m_cv.notify_all(); } private: -- cgit v1.2.3