diff options
Diffstat (limited to 'include/framework/scheduling/Epoch.h')
| -rw-r--r-- | include/framework/scheduling/Epoch.h | 45 |
1 files changed, 44 insertions, 1 deletions
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index f4aefe9..0ebbde9 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -25,6 +25,7 @@ public: Epoch(size_t number=0) : m_buffers() , m_structure(nullptr) + , m_active_merge(false) , m_active_jobs(0) , m_active(true) , m_epoch_number(number) @@ -34,6 +35,7 @@ public: : m_buffers() , m_structure(structure) , m_active_jobs(0) + , m_active_merge(false) , m_active(true) , m_epoch_number(number) { @@ -54,11 +56,25 @@ public: } } - void add_buffer(Buffer *buf) { + Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) { assert(buf); + /* + * if a current buffer is specified, only add the + * new buffer if the active buffer is the current, + * otherwise just return the active buffer (poor man's + * CAS). + */ + if (cur_buf) { + auto active_buf = get_active_buffer(); + if (active_buf != cur_buf) { + return active_buf; + } + } + buf->take_reference(); m_buffers.push_back(buf); + return buf; } void start_job() { @@ -137,6 +153,31 @@ public: return epoch; } + /* + * Check if a merge can be started from this Epoch. + * At present, without concurrent merging, this simply + * checks if there is currently a scheduled merge based + * on this Epoch. If there is, returns false. If there + * isn't, return true and set a flag indicating that + * there is an active merge. + */ + bool prepare_merge() { + auto old = m_active_merge.load(); + if (old) { + return false; + } + + // FIXME: this needs cleaned up + while (!m_active_merge.compare_exchange_strong(old, true)) { + old = m_active_merge.load(); + if (old) { + return false; + } + } + + return true; + } + void set_inactive() { m_active = false; } @@ -170,6 +211,8 @@ private: std::condition_variable m_active_cv; std::mutex m_cv_lock; + std::atomic<bool> m_active_merge; + /* * The number of currently active jobs * (queries/merges) operating on this |