summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2023-10-31 11:01:05 -0400
committerDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2023-10-31 11:01:05 -0400
commitc00900c5bfbc23537bf7084a927e7fd2ef0a5c94 (patch)
tree9f5d8ddbdad108c02333e42c6d9e18ed29d94704 /include/framework/DynamicExtension.h
parentd2279e1b96d352a0af1d425dcaaf93e8a26a8d52 (diff)
downloaddynamic-extension-c00900c5bfbc23537bf7084a927e7fd2ef0a5c94.tar.gz
DynamicExtension: added a way to block on merge completion
This is mostly just for testing purposes at the moment, though I'd imagine it may be useful for other reasons too.
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h34
1 files changed, 31 insertions, 3 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 9129060..2f0327f 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -146,7 +146,11 @@ public:
return m_buffer_capacity;
}
- Shard *create_static_structure() {
+ Shard *create_static_structure(bool await_merge_completion=false) {
+ if (await_merge_completion) {
+ await_next_epoch();
+ }
+
auto epoch = get_active_epoch_protected();
auto bv = epoch->get_buffer_view();
@@ -187,6 +191,19 @@ public:
}
/*
+ * If the current epoch is *not* the newest one, then wait for
+ * the newest one to become available. Otherwise, returns immediately.
+ */
+ void await_next_epoch() {
+ while (m_current_epoch.load() != m_newest_epoch.load()) {
+ std::unique_lock<std::mutex> m_epoch_cv_lk;
+ m_epoch_cv.wait(m_epoch_cv_lk);
+ }
+
+ return;
+ }
+
+ /*
* Mostly exposed for unit-testing purposes. Verifies that the current
* active version of the ExtensionStructure doesn't violate the maximum
* tombstone proportion invariant.
@@ -203,8 +220,12 @@ private:
std::set<Structure *> m_versions;
std::atomic<size_t> m_current_epoch;
+ std::atomic<size_t> m_newest_epoch;
std::unordered_map<size_t, _Epoch *> m_epochs;
+ std::condition_variable m_epoch_cv;
+ std::mutex m_epoch_cv_lk;
+
size_t m_scale_factor;
double m_max_delete_prop;
size_t m_buffer_capacity;
@@ -220,7 +241,7 @@ private:
}
void advance_epoch() {
- size_t new_epoch_num = m_current_epoch.load() + 1;
+ size_t new_epoch_num = m_newest_epoch.load();
_Epoch *new_epoch = m_epochs[new_epoch_num];
_Epoch *old_epoch = m_epochs[m_current_epoch.load()];
@@ -233,6 +254,12 @@ private:
new_epoch->add_buffer(old_epoch->get_buffers()[i]);
}
m_current_epoch.fetch_add(1);
+
+ /* notify any blocking threads that the new epoch is available */
+ m_epoch_cv_lk.lock();
+ m_epoch_cv.notify_all();
+ m_epoch_cv_lk.unlock();
+
retire_epoch(old_epoch);
}
@@ -250,7 +277,8 @@ private:
auto new_epoch = get_active_epoch()->clone();
std::unique_lock<std::mutex> m_struct_lock;
m_versions.insert(new_epoch->get_structure());
- m_epochs.insert({m_current_epoch.load() + 1, new_epoch});
+ m_newest_epoch.fetch_add(1);
+ m_epochs.insert({m_newest_epoch.load(), new_epoch});
m_struct_lock.release();
return new_epoch;