summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling/Task.h
blob: 008f232f84c3fdce22e5f5773a165e544d827e2f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
 * include/framework/scheduling/Task.h
 *
 * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> 
 *
 * Distributed under the Modified BSD License.
 *
 */
#pragma once

#include <future>
#include <functional>
#include <chrono>

#include "framework/util/Configuration.h"
#include "framework/scheduling/Epoch.h"
#include "framework/scheduling/statistics.h"

namespace de {

template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L>
struct ReconstructionArgs {
    Epoch<R, S, Q, L> *epoch;
    std::vector<ReconstructionTask> merges;
    std::promise<bool> result;
    bool compaction;
    void *extension;
};

template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L>
struct QueryArgs {
    std::promise<std::vector<R>> result_set;
    void *query_parms;
    void *extension;
};

typedef std::function<void(void*)> Job;

struct Task {
    Task(size_t size, size_t ts, Job job, void *args, size_t type=0, SchedulerStatistics *stats=nullptr) 
      : m_job(job)
      , m_size(size)
      , m_timestamp(ts)
      , m_args(args)
      , m_type(type)
      , m_stats(stats)
    {}

    Job m_job;
    size_t m_size;
    size_t m_timestamp;
    void *m_args;
    size_t m_type;
    SchedulerStatistics *m_stats;

    friend bool operator<(const Task &self, const Task &other) {
        return self.m_timestamp < other.m_timestamp;
    }

    friend bool operator>(const Task &self, const Task &other) {
        return self.m_timestamp > other.m_timestamp;
    }

    void operator()(size_t thrd_id) {
        auto start = std::chrono::high_resolution_clock::now();
        if (m_stats) {
            m_stats->job_begin(m_timestamp);
        }

        m_job(m_args);

        if (m_stats) {
            m_stats->job_complete(m_timestamp);
        }
        auto stop = std::chrono::high_resolution_clock::now();

        if (m_stats) {
            auto time = std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start).count();
            m_stats->log_time_data(time, m_type);
        }
    }
};

}