1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
/*
* include/framework/scheduling/Task.h
*
* Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
* An abstraction to represent a job to be scheduled. Currently the
* supported task types are queries and merges. Based on the current plan,
* simple buffer inserts will likely also be made into a task at some
* point.
*
*/
#pragma once
#include <chrono>
#include <functional>
#include <future>
#include "framework/scheduling/Epoch.h"
#include "framework/scheduling/statistics.h"
#include "framework/util/Configuration.h"
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
LayoutPolicy L>
struct ReconstructionArgs {
typedef typename ShardType::RECORD RecordType;
Epoch<ShardType, QueryType, L> *epoch;
ReconstructionVector merges;
std::promise<bool> result;
bool compaction;
void *extension;
};
template <ShardInterface S, QueryInterface<S> Q, typename DE> struct QueryArgs {
std::promise<typename Q::ResultType> result_set;
typename Q::Parameters query_parms;
DE *extension;
};
typedef std::function<void(void *)> Job;
struct Task {
Task(size_t size, size_t ts, Job job, void *args, size_t type = 0,
SchedulerStatistics *stats = nullptr)
: m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type),
m_stats(stats) {}
Job m_job;
size_t m_size;
size_t m_timestamp;
void *m_args;
size_t m_type;
SchedulerStatistics *m_stats;
friend bool operator<(const Task &self, const Task &other) {
return self.m_timestamp < other.m_timestamp;
}
friend bool operator>(const Task &self, const Task &other) {
return self.m_timestamp > other.m_timestamp;
}
void operator()(size_t thrd_id) {
auto start = std::chrono::high_resolution_clock::now();
if (m_stats) {
m_stats->job_begin(m_timestamp);
}
m_job(m_args);
if (m_stats) {
m_stats->job_complete(m_timestamp);
}
auto stop = std::chrono::high_resolution_clock::now();
if (m_stats) {
auto time =
std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start)
.count();
m_stats->log_time_data(time, m_type);
}
}
};
} // namespace de
|