summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling/Epoch.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/scheduling/Epoch.h')
-rw-r--r--include/framework/scheduling/Epoch.h45
1 files changed, 44 insertions, 1 deletions
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index f4aefe9..0ebbde9 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -25,6 +25,7 @@ public:
Epoch(size_t number=0)
: m_buffers()
, m_structure(nullptr)
+ , m_active_merge(false)
, m_active_jobs(0)
, m_active(true)
, m_epoch_number(number)
@@ -34,6 +35,7 @@ public:
: m_buffers()
, m_structure(structure)
, m_active_jobs(0)
+ , m_active_merge(false)
, m_active(true)
, m_epoch_number(number)
{
@@ -54,11 +56,25 @@ public:
}
}
- void add_buffer(Buffer *buf) {
+ Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) {
assert(buf);
+ /*
+ * if a current buffer is specified, only add the
+ * new buffer if the active buffer is the current,
+ * otherwise just return the active buffer (poor man's
+ * CAS).
+ */
+ if (cur_buf) {
+ auto active_buf = get_active_buffer();
+ if (active_buf != cur_buf) {
+ return active_buf;
+ }
+ }
+
buf->take_reference();
m_buffers.push_back(buf);
+ return buf;
}
void start_job() {
@@ -137,6 +153,31 @@ public:
return epoch;
}
+ /*
+ * Check if a merge can be started from this Epoch.
+ * At present, without concurrent merging, this simply
+ * checks if there is currently a scheduled merge based
+ * on this Epoch. If there is, returns false. If there
+ * isn't, return true and set a flag indicating that
+ * there is an active merge.
+ */
+ bool prepare_merge() {
+ auto old = m_active_merge.load();
+ if (old) {
+ return false;
+ }
+
+ // FIXME: this needs cleaned up
+ while (!m_active_merge.compare_exchange_strong(old, true)) {
+ old = m_active_merge.load();
+ if (old) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
void set_inactive() {
m_active = false;
}
@@ -170,6 +211,8 @@ private:
std::condition_variable m_active_cv;
std::mutex m_cv_lock;
+ std::atomic<bool> m_active_merge;
+
/*
* The number of currently active jobs
* (queries/merges) operating on this