From 254f8aa85ea8962e5c11d8b475a171883c22f168 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 12:39:35 -0500 Subject: DynamicExtension: internal_append fixes Fixed a few bugs with concurrent operation of internal_append, as well as enabled the spawning of multiple empty buffers while merges are currently active. --- include/framework/scheduling/Epoch.h | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 58fe6cd..0ebbde9 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -25,6 +25,7 @@ public: Epoch(size_t number=0) : m_buffers() , m_structure(nullptr) + , m_active_merge(false) , m_active_jobs(0) , m_active(true) , m_epoch_number(number) @@ -34,6 +35,7 @@ public: : m_buffers() , m_structure(structure) , m_active_jobs(0) + , m_active_merge(false) , m_active(true) , m_epoch_number(number) { @@ -151,6 +153,31 @@ public: return epoch; } + /* + * Check if a merge can be started from this Epoch. + * At present, without concurrent merging, this simply + * checks if there is currently a scheduled merge based + * on this Epoch. If there is, returns false. If there + * isn't, return true and set a flag indicating that + * there is an active merge. + */ + bool prepare_merge() { + auto old = m_active_merge.load(); + if (old) { + return false; + } + + // FIXME: this needs cleaned up + while (!m_active_merge.compare_exchange_strong(old, true)) { + old = m_active_merge.load(); + if (old) { + return false; + } + } + + return true; + } + void set_inactive() { m_active = false; } @@ -184,6 +211,8 @@ private: std::condition_variable m_active_cv; std::mutex m_cv_lock; + std::atomic m_active_merge; + /* * The number of currently active jobs * (queries/merges) operating on this -- cgit v1.2.3