summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/framework/DynamicExtension.h34
1 files changed, 31 insertions, 3 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 9129060..2f0327f 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -146,7 +146,11 @@ public:
return m_buffer_capacity;
}
- Shard *create_static_structure() {
+ Shard *create_static_structure(bool await_merge_completion=false) {
+ if (await_merge_completion) {
+ await_next_epoch();
+ }
+
auto epoch = get_active_epoch_protected();
auto bv = epoch->get_buffer_view();
@@ -187,6 +191,19 @@ public:
}
/*
+ * If the current epoch is *not* the newest one, then wait for
+ * the newest one to become available. Otherwise, returns immediately.
+ */
+ void await_next_epoch() {
+ while (m_current_epoch.load() != m_newest_epoch.load()) {
+ std::unique_lock<std::mutex> m_epoch_cv_lk;
+ m_epoch_cv.wait(m_epoch_cv_lk);
+ }
+
+ return;
+ }
+
+ /*
* Mostly exposed for unit-testing purposes. Verifies that the current
* active version of the ExtensionStructure doesn't violate the maximum
* tombstone proportion invariant.
@@ -203,8 +220,12 @@ private:
std::set<Structure *> m_versions;
std::atomic<size_t> m_current_epoch;
+ std::atomic<size_t> m_newest_epoch;
std::unordered_map<size_t, _Epoch *> m_epochs;
+ std::condition_variable m_epoch_cv;
+ std::mutex m_epoch_cv_lk;
+
size_t m_scale_factor;
double m_max_delete_prop;
size_t m_buffer_capacity;
@@ -220,7 +241,7 @@ private:
}
void advance_epoch() {
- size_t new_epoch_num = m_current_epoch.load() + 1;
+ size_t new_epoch_num = m_newest_epoch.load();
_Epoch *new_epoch = m_epochs[new_epoch_num];
_Epoch *old_epoch = m_epochs[m_current_epoch.load()];
@@ -233,6 +254,12 @@ private:
new_epoch->add_buffer(old_epoch->get_buffers()[i]);
}
m_current_epoch.fetch_add(1);
+
+ /* notify any blocking threads that the new epoch is available */
+ m_epoch_cv_lk.lock();
+ m_epoch_cv.notify_all();
+ m_epoch_cv_lk.unlock();
+
retire_epoch(old_epoch);
}
@@ -250,7 +277,8 @@ private:
auto new_epoch = get_active_epoch()->clone();
std::unique_lock<std::mutex> m_struct_lock;
m_versions.insert(new_epoch->get_structure());
- m_epochs.insert({m_current_epoch.load() + 1, new_epoch});
+ m_newest_epoch.fetch_add(1);
+ m_epochs.insert({m_newest_epoch.load(), new_epoch});
m_struct_lock.release();
return new_epoch;