summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling/Task.h
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <dbr4@psu.edu>2024-02-09 14:06:59 -0500
committerGitHub <noreply@github.com>2024-02-09 14:06:59 -0500
commitbc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30 (patch)
tree66333c55feb0ea8875a50e6dc07c8535d241bf1c /include/framework/scheduling/Task.h
parent076e104b8672924c3d80cd1da2fdb5ebee1766ac (diff)
parent46885246313358a3b606eca139b20280e96db10e (diff)
downloaddynamic-extension-bc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30.tar.gz
Merge pull request #1 from dbrumbaugh/new-buffer
Initial Concurrency Implementation
Diffstat (limited to 'include/framework/scheduling/Task.h')
-rw-r--r--include/framework/scheduling/Task.h89
1 files changed, 89 insertions, 0 deletions
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
new file mode 100644
index 0000000..d5d4266
--- /dev/null
+++ b/include/framework/scheduling/Task.h
@@ -0,0 +1,89 @@
+/*
+ * include/framework/scheduling/Task.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * An abstraction to represent a job to be scheduled. Currently the
+ * supported task types are queries and merges. Based on the current plan,
+ * simple buffer inserts will likely also be made into a task at some
+ * point.
+ *
+ */
+#pragma once
+
+#include <future>
+#include <functional>
+#include <chrono>
+
+#include "framework/util/Configuration.h"
+#include "framework/scheduling/Epoch.h"
+#include "framework/scheduling/statistics.h"
+
+namespace de {
+
+template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L>
+struct ReconstructionArgs {
+ Epoch<R, S, Q, L> *epoch;
+ std::vector<ReconstructionTask> merges;
+ std::promise<bool> result;
+ bool compaction;
+ void *extension;
+};
+
+template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L>
+struct QueryArgs {
+ std::promise<std::vector<R>> result_set;
+ void *query_parms;
+ void *extension;
+};
+
+typedef std::function<void(void*)> Job;
+
+struct Task {
+ Task(size_t size, size_t ts, Job job, void *args, size_t type=0, SchedulerStatistics *stats=nullptr)
+ : m_job(job)
+ , m_size(size)
+ , m_timestamp(ts)
+ , m_args(args)
+ , m_type(type)
+ , m_stats(stats)
+ {}
+
+ Job m_job;
+ size_t m_size;
+ size_t m_timestamp;
+ void *m_args;
+ size_t m_type;
+ SchedulerStatistics *m_stats;
+
+ friend bool operator<(const Task &self, const Task &other) {
+ return self.m_timestamp < other.m_timestamp;
+ }
+
+ friend bool operator>(const Task &self, const Task &other) {
+ return self.m_timestamp > other.m_timestamp;
+ }
+
+ void operator()(size_t thrd_id) {
+ auto start = std::chrono::high_resolution_clock::now();
+ if (m_stats) {
+ m_stats->job_begin(m_timestamp);
+ }
+
+ m_job(m_args);
+
+ if (m_stats) {
+ m_stats->job_complete(m_timestamp);
+ }
+ auto stop = std::chrono::high_resolution_clock::now();
+
+ if (m_stats) {
+ auto time = std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start).count();
+ m_stats->log_time_data(time, m_type);
+ }
+ }
+};
+
+}