diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-10-20 17:00:42 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-10-20 17:00:42 -0400 |
| commit | 7ecfb22c32b7986ed1a2439c1abbeed298e4153a (patch) | |
| tree | d7207b5755ce987068620b71f9b4af9a52982c0d /include/framework/scheduling | |
| parent | 1a47cbd7978dcad7ed0b2f2af3f933137eedbfa3 (diff) | |
| download | dynamic-extension-7ecfb22c32b7986ed1a2439c1abbeed298e4153a.tar.gz | |
Initial pass w/ new scheduler setup
currently there's a race condition of some type to sort out.
Diffstat (limited to 'include/framework/scheduling')
| -rw-r--r-- | include/framework/scheduling/SerialScheduler.h | 191 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 112 |
2 files changed, 57 insertions, 246 deletions
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index c43e930..5d6e5c2 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -29,198 +29,67 @@ namespace de { -template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L> class SerialScheduler { - typedef ExtensionStructure<R, S, Q, L> Structure; - typedef MutableBuffer<R> Buffer; public: - /* - * A simple "scheduler" that runs tasks serially, in a FIFO manner. Incoming concurrent - * requests will wait for their turn, and only one task will be active in the system at - * a time. The scheduler will spin up a second thread for running itself, but all tasks - * will be single-threaded. - * - * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means - * unlimited. - * - * Note that the SerialScheduler object is non-concurrent, and so will ignore the - * thread_cnt argument. It will obey the memory_budget, however a failure due to - * memory constraints will be irrecoverable, as there is no way to free up memory - * or block particular tasks until memory becomes available. - */ - SerialScheduler(size_t memory_budget, size_t thread_cnt) - : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX) - , m_used_memory(0) - , m_used_threads(0) - , m_shutdown(false) - { - m_sched_thrd = std::thread(&SerialScheduler::run_scheduler, this); + SerialScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) + , m_used_memory(0) + , m_used_thrds(0) + , m_shutdown(false) + { + m_sched_thrd = std::thread(&SerialScheduler::run, this); } ~SerialScheduler() { - m_shutdown = true; + shutdown(); m_cv.notify_all(); m_sched_thrd.join(); } - bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) { - pending_version = version; - pending_buffer = buffer; - - /* - * Get list of individual level reconstructions that are necessary - * for completing the overall merge - */ - std::vector<MergeTask> merges = version->get_merge_tasks(buffer->get_record_count()); - - /* - * Schedule the merge tasks - */ - for (ssize_t i=0; i<merges.size(); i++) { - merges[i].m_timestamp = m_timestamp.fetch_add(1); - m_task_queue.push(merges[i]); - } - - auto t = MergeTask(-1, 0, buffer->get_record_count() * sizeof(R) * 2, m_timestamp.fetch_add(1)); - m_task_queue.push(t); - - m_cv.notify_all(); - do { - std::unique_lock<std::mutex> merge_cv_lock(m_merge_cv_lock); - m_merge_cv.wait(merge_cv_lock); - } while (m_task_queue.size() > 0); - - assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); - - return true; + void schedule_job(std::function<void(void*)> job, size_t size, void *args) { + size_t ts = m_counter.fetch_add(1); + m_task_queue.push(Task(size, ts, job, args)); } - bool schedule_query() { - return true; + void shutdown() { + m_shutdown = true; } private: - size_t get_timestamp() { - auto ts = m_timestamp.fetch_add(1); - return ts; - } - - void schedule_merge(MergeTask task) { - if (task.m_source_level == -1 && task.m_target_level == 0) { - run_buffer_merge(pending_buffer, pending_version); - } else { - run_merge(task, pending_version); - } - } - - - void schedule_query(QueryTask task) { - - } - - void schedule_next_task() { - auto task = m_task_queue.pop(); - - auto type = std::visit(GetTaskType{}, task); - - switch (type) { - case TaskType::MERGE: - schedule_merge(std::get<MergeTask>(task)); - break; - case TaskType::QUERY: - schedule_query(std::get<QueryTask>(task)); - break; - default: assert(false); - } - - if (m_task_queue.size() == 0) { - m_merge_cv.notify_all(); - } - } + psudb::LockedPriorityQueue<Task> m_task_queue; + size_t m_memory_budget; + size_t m_thrd_cnt; - void run_merge(MergeTask task, Structure *version) { - version->merge_levels(task.m_target_level, task.m_source_level); - - if (!version->validate_tombstone_proportion(task.m_target_level)) { - auto tasks = version->get_merge_tasks(task.m_target_level); - /* - * Schedule the merge tasks - */ - std::promise<void> trigger_prom; - tasks[tasks.size() - 1].make_dependent_on(trigger_prom); - tasks[tasks.size() - 1].m_timestamp = m_timestamp.fetch_add(1); - m_task_queue.push(tasks[tasks.size() - 1]); - - for (ssize_t i=tasks.size()-2; i>=0; i--) { - tasks[i].make_dependent_on(tasks[i+1]); - tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_task_queue.push(tasks[i]); - } + bool m_shutdown; - /* - * Block the completion of any task until all have been - * scheduled. Probably not strictly necessary, but due to - * interface constraints with the way promises are used, - * a dummy promise needs to be set up for the first job - * anyway. It's easiest to just release it here. - */ - trigger_prom.set_value(); - } - } + std::atomic<size_t> m_counter; + std::mutex m_cv_lock; + std::condition_variable m_cv; + std::thread m_sched_thrd; - void run_buffer_merge(Buffer *buffer, Structure *version) { - version->merge_buffer(buffer); - if (!version->validate_tombstone_proportion(0)) { - auto tasks = version->get_merge_tasks_from_level(0); + std::atomic<size_t> m_used_thrds; + std::atomic<size_t> m_used_memory; - /* - * Schedule the merge tasks (FIXME: currently this just - * executes them sequentially in a blocking fashion) - */ - for (ssize_t i=tasks.size()-1; i>=0; i--) { - tasks[i].m_timestamp = m_timestamp.fetch_add(1); - m_task_queue.push(tasks[i]); - } - } + void schedule_next() { + auto t = m_task_queue.pop(); + t(); } - void run_scheduler() { + void run() { do { std::unique_lock<std::mutex> cv_lock(m_cv_lock); m_cv.wait(cv_lock); - while (m_task_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) { - schedule_next_task(); + while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) { + schedule_next(); } cv_lock.unlock(); } while(!m_shutdown); } - - size_t m_memory_budget; - size_t m_thread_cnt; - - Buffer *pending_buffer; - Structure *pending_version; - - alignas(64) std::atomic<size_t> m_used_memory; - alignas(64) std::atomic<size_t> m_used_threads; - alignas(64) std::atomic<size_t> m_timestamp; - - psudb::LockedPriorityQueue<Task, std::vector<Task>, std::greater<Task>> m_task_queue; - - std::mutex m_cv_lock; - std::condition_variable m_cv; - - std::mutex m_merge_cv_lock; - std::condition_variable m_merge_cv; - - std::thread m_sched_thrd; - - bool m_shutdown; }; } diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 3c1b158..518159d 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -5,111 +5,53 @@ #include <variant> #include <future> +#include <functional> #include "framework/util/Configuration.h" namespace de { -enum class TaskType { - MERGE, - QUERY +struct MergeArgs { + void *version; + void *buffer; + std::vector<MergeTask> merges; + std::promise<bool> result; }; -struct TaskDependency { - std::promise<void> prom; - std::future<void> fut; +template <typename R> +struct QueryArgs { + void *version; + void *buffer; + std::promise<std::vector<R>> result_set; + void *query_parms; }; -struct MergeTask { - level_index m_source_level; - level_index m_target_level; - size_t m_timestamp; - size_t m_size; - TaskType m_type; - std::unique_ptr<TaskDependency> m_dep; - - MergeTask() = default; - - MergeTask(level_index source, level_index target, size_t size, size_t timestamp) - : m_source_level(source) - , m_target_level(target) - , m_timestamp(timestamp) - , m_size(size) - , m_type(TaskType::MERGE) - , m_dep(std::make_unique<TaskDependency>()){} - +typedef std::function<void(void*)> Job; - MergeTask(MergeTask &t) - : m_source_level(t.m_source_level) - , m_target_level(t.m_target_level) - , m_timestamp(t.m_timestamp) - , m_size(t.m_size) - , m_type(TaskType::MERGE) - , m_dep(std::move(t.m_dep)) +struct Task { + Task(size_t size, size_t ts, Job job, void *args) + : m_job(job) + , m_size(size) + , m_timestamp(ts) + , m_args(args) {} - - TaskType get_type() const { - return m_type; - } - - void make_dependent_on(MergeTask &task) { - m_dep->fut = task.m_dep->prom.get_future(); - } - - void make_dependent_on(TaskDependency *dep) { - m_dep->fut = dep->prom.get_future(); - } - - friend bool operator<(const MergeTask &self, const MergeTask &other) { - return self.m_timestamp < other.m_timestamp; - } - - friend bool operator>(const MergeTask &self, const MergeTask &other) { - return self.m_timestamp > other.m_timestamp; - } - -}; - -struct QueryTask { - size_t m_timestamp; + Job m_job; size_t m_size; - TaskType m_type; - std::unique_ptr<TaskDependency> m_dep; - - QueryTask(QueryTask &t) - : m_timestamp(t.m_timestamp) - , m_size(t.m_size) - , m_type(t.m_type) - , m_dep(std::move(t.m_dep)) - {} - - TaskType get_type() const { - return m_type; - } - - void SetDependency(QueryTask &task) { - m_dep->fut = task.m_dep->prom.get_future(); - } - - void SetDependency(TaskDependency *dep) { - m_dep->fut = dep->prom.get_future(); - } + size_t m_timestamp; + void *m_args; - friend bool operator<(const QueryTask &self, const QueryTask &other) { + friend bool operator<(const Task &self, const Task &other) { return self.m_timestamp < other.m_timestamp; } - friend bool operator>(const QueryTask &self, const QueryTask &other) { + friend bool operator>(const Task &self, const Task &other) { return self.m_timestamp > other.m_timestamp; } -}; -struct GetTaskType { - TaskType operator()(const MergeTask &t) { return t.get_type(); } - TaskType operator()(const QueryTask &t) { return t.get_type(); } + void operator()() { + m_job(m_args); + } }; -typedef std::variant<MergeTask, QueryTask> Task; - } |