summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling/Task.h
blob: 4529b2e1878b1c4ff8535cc6bd1e1799f458cd7f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/*
 * include/framework/scheduling/Task.h
 *
 * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu>
 *
 * Distributed under the Modified BSD License.
 *
 * An abstraction to represent a job to be scheduled. Currently the
 * supported task types are queries and merges. Based on the current plan,
 * simple buffer inserts will likely also be made into a task at some
 * point.
 *
 */
#pragma once

#include <chrono>
#include <functional>
#include <future>
#include <condition_variable>

#include "framework/scheduling/Version.h"
#include "framework/scheduling/statistics.h"
#include "util/types.h"

namespace de {

enum class ReconstructionPriority {
  FLUSH = 0,
  CMPCT = 1,
  MAINT = 2  
};

template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
struct ReconstructionArgs {
  typedef typename ShardType::RECORD RecordType;
  std::shared_ptr<Version<ShardType, QueryType>> version;
  ReconstructionVector tasks;
  void *extension;
  ReconstructionPriority priority;
  size_t initial_version;
  long predicted_runtime;
};

template <ShardInterface S, QueryInterface<S> Q, typename DE> struct QueryArgs {
  std::promise<typename Q::ResultType> result_set;
  typename Q::Parameters query_parms;
  DE *extension;
};

typedef std::function<void(void *)> Job;

struct Task {
  Task(size_t size, size_t ts, Job job, void *args, size_t type = 0,
       SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr, std::condition_variable *cv=nullptr)
      : m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type),
        m_stats(stats), m_lk(lk), m_cv(cv) {}

  Job m_job;
  size_t m_size;
  size_t m_timestamp;
  void *m_args;
  size_t m_type;
  SchedulerStatistics *m_stats;
  std::mutex *m_lk;
  std::condition_variable *m_cv;

  friend bool operator<(const Task &self, const Task &other) {
    return self.m_timestamp < other.m_timestamp;
  }

  friend bool operator>(const Task &self, const Task &other) {
    return self.m_timestamp > other.m_timestamp;
  }

  void operator()(size_t thrd_id) {
    if (m_stats) {
      m_stats->job_begin(m_timestamp);
    }

    m_job(m_args);

    if (m_stats) {
      m_stats->job_complete(m_timestamp);
    }

    if (m_lk) {
      m_lk->unlock();
    }

    if (m_cv) {
      m_cv->notify_all();
    }
  }
};

} // namespace de