summaryrefslogtreecommitdiffstats
path: root/include/framework/Scheduler.h
blob: 28ed8a9a685b90f9987f705e42a68f43b8764486 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/*
 * include/framework/Scheduler.h
 *
 * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> 
 *                    Dong Xie <dongx@psu.edu>
 *
 * All rights reserved. Published under the Modified BSD License.
 *
 */
#pragma once

#include <vector>
#include <memory>
#include <queue>

#include "util/types.h"
#include "framework/ShardInterface.h"
#include "framework/QueryInterface.h"
#include "framework/RecordInterface.h"
#include "framework/MutableBuffer.h"
#include "framework/Configuration.h"
#include "framework/ExtensionStructure.h"

namespace de {

template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
class Scheduler {
    typedef ExtensionStructure<R, S, Q, L> Structure;
public:
    /*
     * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means 
     * unlimited.
     */
    Scheduler(size_t memory_budget, size_t thread_cnt) 
        : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
        , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX)
        , m_used_memory(0)
        , m_used_threads(0)
    {}

    bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) {
        /*
         * Get list of individual level reconstructions that are necessary
         * for completing the overall merge
         */
        std::vector<MergeTask> merges = version->get_merge_tasks(buffer->get_record_count());

        /*
         * Schedule the merge tasks (FIXME: currently this just 
         * executes them sequentially in a blocking fashion)
         */
        for (ssize_t i=merges.size()-1; i>=0; i--) {
            version->merge_levels(merges[i].m_target_level, merges[i].m_source_level);
        }

        return version->merge_buffer(buffer);
    }

private:
    size_t get_timestamp() {
        auto ts = m_timestamp.fetch_add(1);
        return ts;
    }

    size_t m_memory_budget;
    size_t m_thread_cnt;

    alignas(64) std::atomic<size_t> m_used_memory;
    alignas(64) std::atomic<size_t> m_used_threads;
    alignas(64) std::atomic<size_t> m_timestamp;

    std::priority_queue<MergeTask> m_merge_queue;
    std::mutex m_merge_queue_lock;
};

}