summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling/Task.h
blob: 6b6f040217f320afe6247168b2f6ae3e9fe918b8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/*
 * include/framework/scheduling/Task.h
 *
 * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
 *
 * Distributed under the Modified BSD License.
 *
 * An abstraction to represent a job to be scheduled. Currently the
 * supported task types are queries and merges. Based on the current plan,
 * simple buffer inserts will likely also be made into a task at some
 * point.
 *
 */
#pragma once

#include <chrono>
#include <functional>
#include <future>

#include "framework/scheduling/Epoch.h"
#include "framework/scheduling/statistics.h"
#include "framework/util/Configuration.h"

namespace de {

template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
          LayoutPolicy L>
struct ReconstructionArgs {
  typedef typename ShardType::RECORD RecordType;
  Epoch<ShardType, QueryType, L> *epoch;
  ReconstructionVector merges;
  std::promise<bool> result;
  bool compaction;
  void *extension;
};

template <ShardInterface S, QueryInterface<S> Q, typename DE> struct QueryArgs {
  std::promise<std::vector<typename Q::ResultType>> result_set;
  typename Q::Parameters query_parms;
  DE *extension;
};

typedef std::function<void(void *)> Job;

struct Task {
  Task(size_t size, size_t ts, Job job, void *args, size_t type = 0,
       SchedulerStatistics *stats = nullptr)
      : m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type),
        m_stats(stats) {}

  Job m_job;
  size_t m_size;
  size_t m_timestamp;
  void *m_args;
  size_t m_type;
  SchedulerStatistics *m_stats;

  friend bool operator<(const Task &self, const Task &other) {
    return self.m_timestamp < other.m_timestamp;
  }

  friend bool operator>(const Task &self, const Task &other) {
    return self.m_timestamp > other.m_timestamp;
  }

  void operator()(size_t thrd_id) {
    auto start = std::chrono::high_resolution_clock::now();
    if (m_stats) {
      m_stats->job_begin(m_timestamp);
    }

    m_job(m_args);

    if (m_stats) {
      m_stats->job_complete(m_timestamp);
    }
    auto stop = std::chrono::high_resolution_clock::now();

    if (m_stats) {
      auto time =
          std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start)
              .count();
      m_stats->log_time_data(time, m_type);
    }
  }
};

} // namespace de