summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-10-30 14:47:35 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-10-30 14:47:35 -0400
commit32aeedbaf6584eb71126cbe92cb42e93b65d69d3 (patch)
tree644a0bad4a67e0a7598de17ebbff78d958c4cbad /include
parent40b87b74f2bf4e93fdc9dabd6eab9175187fb63c (diff)
downloaddynamic-extension-32aeedbaf6584eb71126cbe92cb42e93b65d69d3.tar.gz
Epoch/DynamicExtension: added cv to epoch retirement check
Instead of busy waiting on the active job count, a condition variable is now used to wait for all active jobs to finish before freeing an epoch's resources.
Diffstat (limited to 'include')
-rw-r--r--include/framework/DynamicExtension.h9
-rw-r--r--include/framework/scheduling/Epoch.h31
2 files changed, 36 insertions, 4 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index eb78d48..21d0261 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -79,6 +79,8 @@ public:
}
int erase(const R &rec) {
+ // FIXME: delete tagging will require a lot of extra work to get
+ // operating "correctly" in a concurrent environment.
if constexpr (D == DeletePolicy::TAGGING) {
BufView buffers = get_active_epoch()->get_buffer_view();
@@ -118,6 +120,7 @@ public:
epoch->start_job();
auto t = epoch->get_buffer_view().get_tombstone_count() + epoch->get_structure()->get_tombstone_count();
epoch->end_job();
+
return t;
}
@@ -271,11 +274,9 @@ private:
* accumulate new active jobs. Eventually, this
* number will hit zero and the function will
* proceed.
- *
- * FIXME: this can be replaced with a cv, which
- * is probably a superior solution in this case
*/
- while (epoch->get_active_job_num() > 0)
+
+ while (!epoch->retirable())
;
/*
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index fe63c86..87463bd 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -62,6 +62,11 @@ public:
void end_job() {
m_active_jobs.fetch_add(-1);
+
+ if (m_active_jobs.load() == 0) {
+ std::unique_lock<std::mutex> lk(m_cv_lock);
+ m_active_cv.notify_all();
+ }
}
size_t get_active_job_num() {
@@ -115,10 +120,35 @@ public:
return epoch;
}
+ /*
+ *
+ */
+ bool retirable() {
+ /* if epoch is currently active, then it cannot be retired */
+ if (m_active) {
+ return false;
+ }
+
+ /*
+ * if the epoch has active jobs but is not itself active,
+ * wait for them to finish and return true. If there are
+ * not active jobs, return true immediately
+ */
+ while (m_active_jobs > 0) {
+ std::unique_lock<std::mutex> lk(m_cv_lock);
+ m_active_cv.wait(lk);
+ }
+
+ return true;
+ }
+
private:
Structure *m_structure;
std::vector<Buffer *> m_buffers;
+ std::condition_variable m_active_cv;
+ std::mutex m_cv_lock;
+
/*
* The number of currently active jobs
* (queries/merges) operating on this
@@ -126,5 +156,6 @@ private:
* when this number is 0.
*/
std::atomic<size_t> m_active_jobs;
+ bool m_active;
};
}