diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-10-30 14:47:35 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-10-30 14:47:35 -0400 |
| commit | 32aeedbaf6584eb71126cbe92cb42e93b65d69d3 (patch) | |
| tree | 644a0bad4a67e0a7598de17ebbff78d958c4cbad /include/framework | |
| parent | 40b87b74f2bf4e93fdc9dabd6eab9175187fb63c (diff) | |
| download | dynamic-extension-32aeedbaf6584eb71126cbe92cb42e93b65d69d3.tar.gz | |
Epoch/DynamicExtension: added cv to epoch retirement check
Instead of busy waiting on the active job count, a condition variable is
now used to wait for all active jobs to finish before freeing an epoch's
resources.
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 9 | ||||
| -rw-r--r-- | include/framework/scheduling/Epoch.h | 31 |
2 files changed, 36 insertions, 4 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index eb78d48..21d0261 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -79,6 +79,8 @@ public: } int erase(const R &rec) { + // FIXME: delete tagging will require a lot of extra work to get + // operating "correctly" in a concurrent environment. if constexpr (D == DeletePolicy::TAGGING) { BufView buffers = get_active_epoch()->get_buffer_view(); @@ -118,6 +120,7 @@ public: epoch->start_job(); auto t = epoch->get_buffer_view().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); epoch->end_job(); + return t; } @@ -271,11 +274,9 @@ private: * accumulate new active jobs. Eventually, this * number will hit zero and the function will * proceed. - * - * FIXME: this can be replaced with a cv, which - * is probably a superior solution in this case */ - while (epoch->get_active_job_num() > 0) + + while (!epoch->retirable()) ; /* diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index fe63c86..87463bd 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -62,6 +62,11 @@ public: void end_job() { m_active_jobs.fetch_add(-1); + + if (m_active_jobs.load() == 0) { + std::unique_lock<std::mutex> lk(m_cv_lock); + m_active_cv.notify_all(); + } } size_t get_active_job_num() { @@ -115,10 +120,35 @@ public: return epoch; } + /* + * + */ + bool retirable() { + /* if epoch is currently active, then it cannot be retired */ + if (m_active) { + return false; + } + + /* + * if the epoch has active jobs but is not itself active, + * wait for them to finish and return true. If there are + * not active jobs, return true immediately + */ + while (m_active_jobs > 0) { + std::unique_lock<std::mutex> lk(m_cv_lock); + m_active_cv.wait(lk); + } + + return true; + } + private: Structure *m_structure; std::vector<Buffer *> m_buffers; + std::condition_variable m_active_cv; + std::mutex m_cv_lock; + /* * The number of currently active jobs * (queries/merges) operating on this @@ -126,5 +156,6 @@ private: * when this number is 0. */ std::atomic<size_t> m_active_jobs; + bool m_active; }; } |