diff options
| author | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2023-10-31 11:01:05 -0400 |
|---|---|---|
| committer | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2023-10-31 11:01:05 -0400 |
| commit | c00900c5bfbc23537bf7084a927e7fd2ef0a5c94 (patch) | |
| tree | 9f5d8ddbdad108c02333e42c6d9e18ed29d94704 | |
| parent | d2279e1b96d352a0af1d425dcaaf93e8a26a8d52 (diff) | |
| download | dynamic-extension-c00900c5bfbc23537bf7084a927e7fd2ef0a5c94.tar.gz | |
DynamicExtension: added a way to block on merge completion
This is mostly just for testing purposes at the moment, though I'd
imagine it may be useful for other reasons too.
| -rw-r--r-- | include/framework/DynamicExtension.h | 34 |
1 files changed, 31 insertions, 3 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 9129060..2f0327f 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -146,7 +146,11 @@ public: return m_buffer_capacity; } - Shard *create_static_structure() { + Shard *create_static_structure(bool await_merge_completion=false) { + if (await_merge_completion) { + await_next_epoch(); + } + auto epoch = get_active_epoch_protected(); auto bv = epoch->get_buffer_view(); @@ -187,6 +191,19 @@ public: } /* + * If the current epoch is *not* the newest one, then wait for + * the newest one to become available. Otherwise, returns immediately. + */ + void await_next_epoch() { + while (m_current_epoch.load() != m_newest_epoch.load()) { + std::unique_lock<std::mutex> m_epoch_cv_lk; + m_epoch_cv.wait(m_epoch_cv_lk); + } + + return; + } + + /* * Mostly exposed for unit-testing purposes. Verifies that the current * active version of the ExtensionStructure doesn't violate the maximum * tombstone proportion invariant. @@ -203,8 +220,12 @@ private: std::set<Structure *> m_versions; std::atomic<size_t> m_current_epoch; + std::atomic<size_t> m_newest_epoch; std::unordered_map<size_t, _Epoch *> m_epochs; + std::condition_variable m_epoch_cv; + std::mutex m_epoch_cv_lk; + size_t m_scale_factor; double m_max_delete_prop; size_t m_buffer_capacity; @@ -220,7 +241,7 @@ private: } void advance_epoch() { - size_t new_epoch_num = m_current_epoch.load() + 1; + size_t new_epoch_num = m_newest_epoch.load(); _Epoch *new_epoch = m_epochs[new_epoch_num]; _Epoch *old_epoch = m_epochs[m_current_epoch.load()]; @@ -233,6 +254,12 @@ private: new_epoch->add_buffer(old_epoch->get_buffers()[i]); } m_current_epoch.fetch_add(1); + + /* notify any blocking threads that the new epoch is available */ + m_epoch_cv_lk.lock(); + m_epoch_cv.notify_all(); + m_epoch_cv_lk.unlock(); + retire_epoch(old_epoch); } @@ -250,7 +277,8 @@ private: auto new_epoch = get_active_epoch()->clone(); std::unique_lock<std::mutex> m_struct_lock; m_versions.insert(new_epoch->get_structure()); - m_epochs.insert({m_current_epoch.load() + 1, new_epoch}); + m_newest_epoch.fetch_add(1); + m_epochs.insert({m_newest_epoch.load(), new_epoch}); m_struct_lock.release(); return new_epoch; |