summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/scheduling')
-rw-r--r--include/framework/scheduling/Epoch.h45
-rw-r--r--include/framework/scheduling/FIFOScheduler.h13
-rw-r--r--include/framework/scheduling/SerialScheduler.h2
-rw-r--r--include/framework/scheduling/Task.h2
4 files changed, 56 insertions, 6 deletions
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index f4aefe9..0ebbde9 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -25,6 +25,7 @@ public:
Epoch(size_t number=0)
: m_buffers()
, m_structure(nullptr)
+ , m_active_merge(false)
, m_active_jobs(0)
, m_active(true)
, m_epoch_number(number)
@@ -34,6 +35,7 @@ public:
: m_buffers()
, m_structure(structure)
, m_active_jobs(0)
+ , m_active_merge(false)
, m_active(true)
, m_epoch_number(number)
{
@@ -54,11 +56,25 @@ public:
}
}
- void add_buffer(Buffer *buf) {
+ Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) {
assert(buf);
+ /*
+ * if a current buffer is specified, only add the
+ * new buffer if the active buffer is the current,
+ * otherwise just return the active buffer (poor man's
+ * CAS).
+ */
+ if (cur_buf) {
+ auto active_buf = get_active_buffer();
+ if (active_buf != cur_buf) {
+ return active_buf;
+ }
+ }
+
buf->take_reference();
m_buffers.push_back(buf);
+ return buf;
}
void start_job() {
@@ -137,6 +153,31 @@ public:
return epoch;
}
+ /*
+ * Check if a merge can be started from this Epoch.
+ * At present, without concurrent merging, this simply
+ * checks if there is currently a scheduled merge based
+ * on this Epoch. If there is, returns false. If there
+ * isn't, return true and set a flag indicating that
+ * there is an active merge.
+ */
+ bool prepare_merge() {
+ auto old = m_active_merge.load();
+ if (old) {
+ return false;
+ }
+
+ // FIXME: this needs cleaned up
+ while (!m_active_merge.compare_exchange_strong(old, true)) {
+ old = m_active_merge.load();
+ if (old) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
void set_inactive() {
m_active = false;
}
@@ -170,6 +211,8 @@ private:
std::condition_variable m_active_cv;
std::mutex m_cv_lock;
+ std::atomic<bool> m_active_merge;
+
/*
* The number of currently active jobs
* (queries/merges) operating on this
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 91a72b3..1521eb6 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -24,20 +24,26 @@
#include "framework/structure/ExtensionStructure.h"
#include "framework/scheduling/Task.h"
+#include "ctpl/ctpl.h"
#include "psu-ds/LockedPriorityQueue.h"
namespace de {
+
class FIFOScheduler {
+private:
+ static const size_t DEFAULT_MAX_THREADS = 8;
+
public:
FIFOScheduler(size_t memory_budget, size_t thread_cnt)
: m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
- , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX)
+ , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS)
, m_used_memory(0)
, m_used_thrds(0)
, m_shutdown(false)
{
m_sched_thrd = std::thread(&FIFOScheduler::run, this);
+ m_thrd_pool.resize(m_thrd_cnt);
}
~FIFOScheduler() {
@@ -72,6 +78,7 @@ private:
std::condition_variable m_cv;
std::thread m_sched_thrd;
+ ctpl::thread_pool m_thrd_pool;
std::atomic<size_t> m_used_thrds;
std::atomic<size_t> m_used_memory;
@@ -79,7 +86,7 @@ private:
void schedule_next() {
assert(m_task_queue.size() > 0);
auto t = m_task_queue.pop();
- t();
+ m_thrd_pool.push(t);
}
void run() {
@@ -87,7 +94,7 @@ private:
std::unique_lock<std::mutex> cv_lock(m_cv_lock);
m_cv.wait(cv_lock);
- while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) {
+ while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) {
schedule_next();
}
} while(!m_shutdown.load());
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
index 9c767e8..93611d1 100644
--- a/include/framework/scheduling/SerialScheduler.h
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -47,7 +47,7 @@ public:
void schedule_job(std::function<void(void*)> job, size_t size, void *args) {
size_t ts = m_counter++;
auto t = Task(size, ts, job, args);
- t();
+ t(0);
}
void shutdown() {
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 228665f..6dfd7df 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -54,7 +54,7 @@ struct Task {
return self.m_timestamp > other.m_timestamp;
}
- void operator()() {
+ void operator()(size_t thrd_id) {
m_job(m_args);
}
};