diff options
| author | Douglas B. Rumbaugh <dbr4@psu.edu> | 2024-02-09 14:06:59 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-02-09 14:06:59 -0500 |
| commit | bc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30 (patch) | |
| tree | 66333c55feb0ea8875a50e6dc07c8535d241bf1c /include/framework/scheduling/Task.h | |
| parent | 076e104b8672924c3d80cd1da2fdb5ebee1766ac (diff) | |
| parent | 46885246313358a3b606eca139b20280e96db10e (diff) | |
| download | dynamic-extension-bc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30.tar.gz | |
Merge pull request #1 from dbrumbaugh/new-buffer
Initial Concurrency Implementation
Diffstat (limited to 'include/framework/scheduling/Task.h')
| -rw-r--r-- | include/framework/scheduling/Task.h | 89 |
1 files changed, 89 insertions, 0 deletions
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h new file mode 100644 index 0000000..d5d4266 --- /dev/null +++ b/include/framework/scheduling/Task.h @@ -0,0 +1,89 @@ +/* + * include/framework/scheduling/Task.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * An abstraction to represent a job to be scheduled. Currently the + * supported task types are queries and merges. Based on the current plan, + * simple buffer inserts will likely also be made into a task at some + * point. + * + */ +#pragma once + +#include <future> +#include <functional> +#include <chrono> + +#include "framework/util/Configuration.h" +#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/statistics.h" + +namespace de { + +template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L> +struct ReconstructionArgs { + Epoch<R, S, Q, L> *epoch; + std::vector<ReconstructionTask> merges; + std::promise<bool> result; + bool compaction; + void *extension; +}; + +template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L> +struct QueryArgs { + std::promise<std::vector<R>> result_set; + void *query_parms; + void *extension; +}; + +typedef std::function<void(void*)> Job; + +struct Task { + Task(size_t size, size_t ts, Job job, void *args, size_t type=0, SchedulerStatistics *stats=nullptr) + : m_job(job) + , m_size(size) + , m_timestamp(ts) + , m_args(args) + , m_type(type) + , m_stats(stats) + {} + + Job m_job; + size_t m_size; + size_t m_timestamp; + void *m_args; + size_t m_type; + SchedulerStatistics *m_stats; + + friend bool operator<(const Task &self, const Task &other) { + return self.m_timestamp < other.m_timestamp; + } + + friend bool operator>(const Task &self, const Task &other) { + return self.m_timestamp > other.m_timestamp; + } + + void operator()(size_t thrd_id) { + auto start = std::chrono::high_resolution_clock::now(); + if (m_stats) { + m_stats->job_begin(m_timestamp); + } + + m_job(m_args); + + if (m_stats) { + m_stats->job_complete(m_timestamp); + } + auto stop = std::chrono::high_resolution_clock::now(); + + if (m_stats) { + auto time = std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start).count(); + m_stats->log_time_data(time, m_type); + } + } +}; + +} |