diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-09-18 13:05:44 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-09-18 13:05:44 -0400 |
| commit | abc8605a51537fc7b35bb0d9b1da6c724c5c6973 (patch) | |
| tree | b24509db6192b8e02e79237be986c5fcbb52667f /include/framework/Scheduler.h | |
| parent | 7f56949bc847b56da69c9eb3ebe081d6cf9f61c6 (diff) | |
| download | dynamic-extension-abc8605a51537fc7b35bb0d9b1da6c724c5c6973.tar.gz | |
Moved individual merge task execution into the scheduler
This change is made in anticipation of scheduling each task using a
specific thread, and required some modification to the interface of
ExtensionStructure. Namely,
1. ExtensionStructure now supports a get_merge_tasks() interface,
which returns a list of the individual level merges that would
need to be performed to complete a buffer flush of specified
size.
2. merge_levels and merge_buffer have been promoted to the public
interface, to allow their use within the scheduler.
3. merge_buffer has been modified to assume that the structure
already can support a direct flush of the buffer into L0, it is
now the responsibility of the caller to ensure that the necessary
merges have already been completed prior to calling this method.
Currently, preemptive tombstone compactions are non-functional, so some
unit tests are failing. This will be fixed when the thread scheduling
system is set up.
Diffstat (limited to 'include/framework/Scheduler.h')
| -rw-r--r-- | include/framework/Scheduler.h | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h index cd3f430..28ed8a9 100644 --- a/include/framework/Scheduler.h +++ b/include/framework/Scheduler.h @@ -23,19 +23,6 @@ namespace de { - -struct MergeTask { - level_index m_source_level; - level_index m_target_level; - size_t m_size; - size_t m_timestamp; - - bool operator<(MergeTask &other) { - return m_timestamp < other.m_timestamp; - } -}; - - template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L> class Scheduler { typedef ExtensionStructure<R, S, Q, L> Structure; @@ -52,7 +39,20 @@ public: {} bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) { - // FIXME: this is a non-concurrent implementation + /* + * Get list of individual level reconstructions that are necessary + * for completing the overall merge + */ + std::vector<MergeTask> merges = version->get_merge_tasks(buffer->get_record_count()); + + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=merges.size()-1; i>=0; i--) { + version->merge_levels(merges[i].m_target_level, merges[i].m_source_level); + } + return version->merge_buffer(buffer); } |