summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling/Epoch.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-11-06 12:39:35 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2023-11-06 12:39:35 -0500
commit254f8aa85ea8962e5c11d8b475a171883c22f168 (patch)
treeee19378cf463c54d04fce39d1b3e5975f0790299 /include/framework/scheduling/Epoch.h
parentad117358b8ab9924d216edeca0eafa87b4f86896 (diff)
downloaddynamic-extension-254f8aa85ea8962e5c11d8b475a171883c22f168.tar.gz
DynamicExtension: internal_append fixes
Fixed a few bugs with concurrent operation of internal_append, as well as enabled the spawning of multiple empty buffers while merges are currently active.
Diffstat (limited to 'include/framework/scheduling/Epoch.h')
-rw-r--r--include/framework/scheduling/Epoch.h29
1 files changed, 29 insertions, 0 deletions
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index 58fe6cd..0ebbde9 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -25,6 +25,7 @@ public:
Epoch(size_t number=0)
: m_buffers()
, m_structure(nullptr)
+ , m_active_merge(false)
, m_active_jobs(0)
, m_active(true)
, m_epoch_number(number)
@@ -34,6 +35,7 @@ public:
: m_buffers()
, m_structure(structure)
, m_active_jobs(0)
+ , m_active_merge(false)
, m_active(true)
, m_epoch_number(number)
{
@@ -151,6 +153,31 @@ public:
return epoch;
}
+ /*
+ * Check if a merge can be started from this Epoch.
+ * At present, without concurrent merging, this simply
+ * checks if there is currently a scheduled merge based
+ * on this Epoch. If there is, returns false. If there
+ * isn't, return true and set a flag indicating that
+ * there is an active merge.
+ */
+ bool prepare_merge() {
+ auto old = m_active_merge.load();
+ if (old) {
+ return false;
+ }
+
+ // FIXME: this needs cleaned up
+ while (!m_active_merge.compare_exchange_strong(old, true)) {
+ old = m_active_merge.load();
+ if (old) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
void set_inactive() {
m_active = false;
}
@@ -184,6 +211,8 @@ private:
std::condition_variable m_active_cv;
std::mutex m_cv_lock;
+ std::atomic<bool> m_active_merge;
+
/*
* The number of currently active jobs
* (queries/merges) operating on this