summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-10-20 17:00:42 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-10-20 17:00:42 -0400
commit7ecfb22c32b7986ed1a2439c1abbeed298e4153a (patch)
treed7207b5755ce987068620b71f9b4af9a52982c0d /include/framework/scheduling
parent1a47cbd7978dcad7ed0b2f2af3f933137eedbfa3 (diff)
downloaddynamic-extension-7ecfb22c32b7986ed1a2439c1abbeed298e4153a.tar.gz
Initial pass w/ new scheduler setup
currently there's a race condition of some type to sort out.
Diffstat (limited to 'include/framework/scheduling')
-rw-r--r--include/framework/scheduling/SerialScheduler.h191
-rw-r--r--include/framework/scheduling/Task.h112
2 files changed, 57 insertions, 246 deletions
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
index c43e930..5d6e5c2 100644
--- a/include/framework/scheduling/SerialScheduler.h
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -29,198 +29,67 @@
namespace de {
-template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
class SerialScheduler {
- typedef ExtensionStructure<R, S, Q, L> Structure;
- typedef MutableBuffer<R> Buffer;
public:
- /*
- * A simple "scheduler" that runs tasks serially, in a FIFO manner. Incoming concurrent
- * requests will wait for their turn, and only one task will be active in the system at
- * a time. The scheduler will spin up a second thread for running itself, but all tasks
- * will be single-threaded.
- *
- * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means
- * unlimited.
- *
- * Note that the SerialScheduler object is non-concurrent, and so will ignore the
- * thread_cnt argument. It will obey the memory_budget, however a failure due to
- * memory constraints will be irrecoverable, as there is no way to free up memory
- * or block particular tasks until memory becomes available.
- */
- SerialScheduler(size_t memory_budget, size_t thread_cnt)
- : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
- , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX)
- , m_used_memory(0)
- , m_used_threads(0)
- , m_shutdown(false)
- {
- m_sched_thrd = std::thread(&SerialScheduler::run_scheduler, this);
+ SerialScheduler(size_t memory_budget, size_t thread_cnt)
+ : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
+ , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX)
+ , m_used_memory(0)
+ , m_used_thrds(0)
+ , m_shutdown(false)
+ {
+ m_sched_thrd = std::thread(&SerialScheduler::run, this);
}
~SerialScheduler() {
- m_shutdown = true;
+ shutdown();
m_cv.notify_all();
m_sched_thrd.join();
}
- bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) {
- pending_version = version;
- pending_buffer = buffer;
-
- /*
- * Get list of individual level reconstructions that are necessary
- * for completing the overall merge
- */
- std::vector<MergeTask> merges = version->get_merge_tasks(buffer->get_record_count());
-
- /*
- * Schedule the merge tasks
- */
- for (ssize_t i=0; i<merges.size(); i++) {
- merges[i].m_timestamp = m_timestamp.fetch_add(1);
- m_task_queue.push(merges[i]);
- }
-
- auto t = MergeTask(-1, 0, buffer->get_record_count() * sizeof(R) * 2, m_timestamp.fetch_add(1));
- m_task_queue.push(t);
-
- m_cv.notify_all();
- do {
- std::unique_lock<std::mutex> merge_cv_lock(m_merge_cv_lock);
- m_merge_cv.wait(merge_cv_lock);
- } while (m_task_queue.size() > 0);
-
- assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0);
-
- return true;
+ void schedule_job(std::function<void(void*)> job, size_t size, void *args) {
+ size_t ts = m_counter.fetch_add(1);
+ m_task_queue.push(Task(size, ts, job, args));
}
- bool schedule_query() {
- return true;
+ void shutdown() {
+ m_shutdown = true;
}
private:
- size_t get_timestamp() {
- auto ts = m_timestamp.fetch_add(1);
- return ts;
- }
-
- void schedule_merge(MergeTask task) {
- if (task.m_source_level == -1 && task.m_target_level == 0) {
- run_buffer_merge(pending_buffer, pending_version);
- } else {
- run_merge(task, pending_version);
- }
- }
-
-
- void schedule_query(QueryTask task) {
-
- }
-
- void schedule_next_task() {
- auto task = m_task_queue.pop();
-
- auto type = std::visit(GetTaskType{}, task);
-
- switch (type) {
- case TaskType::MERGE:
- schedule_merge(std::get<MergeTask>(task));
- break;
- case TaskType::QUERY:
- schedule_query(std::get<QueryTask>(task));
- break;
- default: assert(false);
- }
-
- if (m_task_queue.size() == 0) {
- m_merge_cv.notify_all();
- }
- }
+ psudb::LockedPriorityQueue<Task> m_task_queue;
+ size_t m_memory_budget;
+ size_t m_thrd_cnt;
- void run_merge(MergeTask task, Structure *version) {
- version->merge_levels(task.m_target_level, task.m_source_level);
-
- if (!version->validate_tombstone_proportion(task.m_target_level)) {
- auto tasks = version->get_merge_tasks(task.m_target_level);
- /*
- * Schedule the merge tasks
- */
- std::promise<void> trigger_prom;
- tasks[tasks.size() - 1].make_dependent_on(trigger_prom);
- tasks[tasks.size() - 1].m_timestamp = m_timestamp.fetch_add(1);
- m_task_queue.push(tasks[tasks.size() - 1]);
-
- for (ssize_t i=tasks.size()-2; i>=0; i--) {
- tasks[i].make_dependent_on(tasks[i+1]);
- tasks[i].m_timestamp = m_timestamp.fetch_add(1);
- m_task_queue.push(tasks[i]);
- }
+ bool m_shutdown;
- /*
- * Block the completion of any task until all have been
- * scheduled. Probably not strictly necessary, but due to
- * interface constraints with the way promises are used,
- * a dummy promise needs to be set up for the first job
- * anyway. It's easiest to just release it here.
- */
- trigger_prom.set_value();
- }
- }
+ std::atomic<size_t> m_counter;
+ std::mutex m_cv_lock;
+ std::condition_variable m_cv;
+ std::thread m_sched_thrd;
- void run_buffer_merge(Buffer *buffer, Structure *version) {
- version->merge_buffer(buffer);
- if (!version->validate_tombstone_proportion(0)) {
- auto tasks = version->get_merge_tasks_from_level(0);
+ std::atomic<size_t> m_used_thrds;
+ std::atomic<size_t> m_used_memory;
- /*
- * Schedule the merge tasks (FIXME: currently this just
- * executes them sequentially in a blocking fashion)
- */
- for (ssize_t i=tasks.size()-1; i>=0; i--) {
- tasks[i].m_timestamp = m_timestamp.fetch_add(1);
- m_task_queue.push(tasks[i]);
- }
- }
+ void schedule_next() {
+ auto t = m_task_queue.pop();
+ t();
}
- void run_scheduler() {
+ void run() {
do {
std::unique_lock<std::mutex> cv_lock(m_cv_lock);
m_cv.wait(cv_lock);
- while (m_task_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) {
- schedule_next_task();
+ while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) {
+ schedule_next();
}
cv_lock.unlock();
} while(!m_shutdown);
}
-
- size_t m_memory_budget;
- size_t m_thread_cnt;
-
- Buffer *pending_buffer;
- Structure *pending_version;
-
- alignas(64) std::atomic<size_t> m_used_memory;
- alignas(64) std::atomic<size_t> m_used_threads;
- alignas(64) std::atomic<size_t> m_timestamp;
-
- psudb::LockedPriorityQueue<Task, std::vector<Task>, std::greater<Task>> m_task_queue;
-
- std::mutex m_cv_lock;
- std::condition_variable m_cv;
-
- std::mutex m_merge_cv_lock;
- std::condition_variable m_merge_cv;
-
- std::thread m_sched_thrd;
-
- bool m_shutdown;
};
}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 3c1b158..518159d 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -5,111 +5,53 @@
#include <variant>
#include <future>
+#include <functional>
#include "framework/util/Configuration.h"
namespace de {
-enum class TaskType {
- MERGE,
- QUERY
+struct MergeArgs {
+ void *version;
+ void *buffer;
+ std::vector<MergeTask> merges;
+ std::promise<bool> result;
};
-struct TaskDependency {
- std::promise<void> prom;
- std::future<void> fut;
+template <typename R>
+struct QueryArgs {
+ void *version;
+ void *buffer;
+ std::promise<std::vector<R>> result_set;
+ void *query_parms;
};
-struct MergeTask {
- level_index m_source_level;
- level_index m_target_level;
- size_t m_timestamp;
- size_t m_size;
- TaskType m_type;
- std::unique_ptr<TaskDependency> m_dep;
-
- MergeTask() = default;
-
- MergeTask(level_index source, level_index target, size_t size, size_t timestamp)
- : m_source_level(source)
- , m_target_level(target)
- , m_timestamp(timestamp)
- , m_size(size)
- , m_type(TaskType::MERGE)
- , m_dep(std::make_unique<TaskDependency>()){}
-
+typedef std::function<void(void*)> Job;
- MergeTask(MergeTask &t)
- : m_source_level(t.m_source_level)
- , m_target_level(t.m_target_level)
- , m_timestamp(t.m_timestamp)
- , m_size(t.m_size)
- , m_type(TaskType::MERGE)
- , m_dep(std::move(t.m_dep))
+struct Task {
+ Task(size_t size, size_t ts, Job job, void *args)
+ : m_job(job)
+ , m_size(size)
+ , m_timestamp(ts)
+ , m_args(args)
{}
-
- TaskType get_type() const {
- return m_type;
- }
-
- void make_dependent_on(MergeTask &task) {
- m_dep->fut = task.m_dep->prom.get_future();
- }
-
- void make_dependent_on(TaskDependency *dep) {
- m_dep->fut = dep->prom.get_future();
- }
-
- friend bool operator<(const MergeTask &self, const MergeTask &other) {
- return self.m_timestamp < other.m_timestamp;
- }
-
- friend bool operator>(const MergeTask &self, const MergeTask &other) {
- return self.m_timestamp > other.m_timestamp;
- }
-
-};
-
-struct QueryTask {
- size_t m_timestamp;
+ Job m_job;
size_t m_size;
- TaskType m_type;
- std::unique_ptr<TaskDependency> m_dep;
-
- QueryTask(QueryTask &t)
- : m_timestamp(t.m_timestamp)
- , m_size(t.m_size)
- , m_type(t.m_type)
- , m_dep(std::move(t.m_dep))
- {}
-
- TaskType get_type() const {
- return m_type;
- }
-
- void SetDependency(QueryTask &task) {
- m_dep->fut = task.m_dep->prom.get_future();
- }
-
- void SetDependency(TaskDependency *dep) {
- m_dep->fut = dep->prom.get_future();
- }
+ size_t m_timestamp;
+ void *m_args;
- friend bool operator<(const QueryTask &self, const QueryTask &other) {
+ friend bool operator<(const Task &self, const Task &other) {
return self.m_timestamp < other.m_timestamp;
}
- friend bool operator>(const QueryTask &self, const QueryTask &other) {
+ friend bool operator>(const Task &self, const Task &other) {
return self.m_timestamp > other.m_timestamp;
}
-};
-struct GetTaskType {
- TaskType operator()(const MergeTask &t) { return t.get_type(); }
- TaskType operator()(const QueryTask &t) { return t.get_type(); }
+ void operator()() {
+ m_job(m_args);
+ }
};
-typedef std::variant<MergeTask, QueryTask> Task;
-
}