From c00900c5bfbc23537bf7084a927e7fd2ef0a5c94 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 11:01:05 -0400 Subject: DynamicExtension: added a way to block on merge completion This is mostly just for testing purposes at the moment, though I'd imagine it may be useful for other reasons too. --- include/framework/DynamicExtension.h | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 9129060..2f0327f 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -146,7 +146,11 @@ public: return m_buffer_capacity; } - Shard *create_static_structure() { + Shard *create_static_structure(bool await_merge_completion=false) { + if (await_merge_completion) { + await_next_epoch(); + } + auto epoch = get_active_epoch_protected(); auto bv = epoch->get_buffer_view(); @@ -186,6 +190,19 @@ public: return flattened; } + /* + * If the current epoch is *not* the newest one, then wait for + * the newest one to become available. Otherwise, returns immediately. + */ + void await_next_epoch() { + while (m_current_epoch.load() != m_newest_epoch.load()) { + std::unique_lock m_epoch_cv_lk; + m_epoch_cv.wait(m_epoch_cv_lk); + } + + return; + } + /* * Mostly exposed for unit-testing purposes. Verifies that the current * active version of the ExtensionStructure doesn't violate the maximum @@ -203,8 +220,12 @@ private: std::set m_versions; std::atomic m_current_epoch; + std::atomic m_newest_epoch; std::unordered_map m_epochs; + std::condition_variable m_epoch_cv; + std::mutex m_epoch_cv_lk; + size_t m_scale_factor; double m_max_delete_prop; size_t m_buffer_capacity; @@ -220,7 +241,7 @@ private: } void advance_epoch() { - size_t new_epoch_num = m_current_epoch.load() + 1; + size_t new_epoch_num = m_newest_epoch.load(); _Epoch *new_epoch = m_epochs[new_epoch_num]; _Epoch *old_epoch = m_epochs[m_current_epoch.load()]; @@ -233,6 +254,12 @@ private: new_epoch->add_buffer(old_epoch->get_buffers()[i]); } m_current_epoch.fetch_add(1); + + /* notify any blocking threads that the new epoch is available */ + m_epoch_cv_lk.lock(); + m_epoch_cv.notify_all(); + m_epoch_cv_lk.unlock(); + retire_epoch(old_epoch); } @@ -250,7 +277,8 @@ private: auto new_epoch = get_active_epoch()->clone(); std::unique_lock m_struct_lock; m_versions.insert(new_epoch->get_structure()); - m_epochs.insert({m_current_epoch.load() + 1, new_epoch}); + m_newest_epoch.fetch_add(1); + m_epochs.insert({m_newest_epoch.load(), new_epoch}); m_struct_lock.release(); return new_epoch; -- cgit v1.2.3