summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-10-20 17:00:42 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-10-20 17:00:42 -0400
commit7ecfb22c32b7986ed1a2439c1abbeed298e4153a (patch)
treed7207b5755ce987068620b71f9b4af9a52982c0d /include/framework
parent1a47cbd7978dcad7ed0b2f2af3f933137eedbfa3 (diff)
downloaddynamic-extension-7ecfb22c32b7986ed1a2439c1abbeed298e4153a.tar.gz
Initial pass w/ new scheduler setup
currently there's a race condition of some type to sort out.
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h184
-rw-r--r--include/framework/interface/Scheduler.h17
-rw-r--r--include/framework/scheduling/SerialScheduler.h191
-rw-r--r--include/framework/scheduling/Task.h112
-rw-r--r--include/framework/structure/ExtensionStructure.h21
-rw-r--r--include/framework/structure/MutableBuffer.h11
-rw-r--r--include/framework/util/Configuration.h1
7 files changed, 196 insertions, 341 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 3a460aa..fc7922c 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -31,7 +31,8 @@
namespace de {
-template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler<R, S, Q, L>>
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING,
+ DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler>
class DynamicExtension {
typedef S Shard;
typedef MutableBuffer<R> Buffer;
@@ -83,68 +84,8 @@ public:
return internal_append(rec, true);
}
- std::vector<R> query(void *parms) {
- auto buffer = get_buffer();
- auto vers = get_active_version();
-
- // Get the buffer query state
- auto buffer_state = Q::get_buffer_query_state(buffer, parms);
-
- // Get the shard query states
- std::vector<std::pair<ShardID, Shard*>> shards;
- std::vector<void*> states;
-
- for (auto &level : vers->get_levels()) {
- level->get_query_states(shards, states, parms);
- }
-
- Q::process_query_states(parms, states, buffer_state);
-
- std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
-
- // Execute the query for the buffer
- auto buffer_results = Q::buffer_query(buffer, buffer_state, parms);
- query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers));
- if constexpr (Q::EARLY_ABORT) {
- if (query_results[0].size() > 0) {
- auto result = Q::merge(query_results, parms);
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(states[i]);
- }
-
- Q::delete_buffer_query_state(buffer_state);
- return result;
- }
- }
-
- // Execute the query for each shard
- for (size_t i=0; i<shards.size(); i++) {
- auto shard_results = Q::query(shards[i].second, states[i], parms);
- query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer, vers));
- if constexpr (Q::EARLY_ABORT) {
- if (query_results[i].size() > 0) {
- auto result = Q::merge(query_results, parms);
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(states[i]);
- }
-
- Q::delete_buffer_query_state(buffer_state);
-
- return result;
- }
- }
- }
-
- // Merge the results together
- auto result = Q::merge(query_results, parms);
-
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(states[i]);
- }
-
- Q::delete_buffer_query_state(buffer_state);
-
- return result;
+ std::future<std::vector<R>> query(void *parms) {
+ return schedule_query(get_active_version(), get_buffer(), parms);
}
size_t get_record_count() {
@@ -239,6 +180,112 @@ private:
return m_versions[0];
}
+ static void merge(void *arguments) {
+ MergeArgs *args = (MergeArgs *) arguments;
+
+ Structure *vers = (Structure *) args->version;
+ Buffer *buff = (Buffer *) args->buffer;
+
+ for (ssize_t i=args->merges.size() - 1; i>=0; i--) {
+ vers->merge_levels(args->merges[i].second, args->merges[i].first);
+ }
+
+ vers->merge_buffer(buff);
+
+ args->result.set_value(true);
+ delete args;
+ }
+
+ static void async_query(void *arguments) {
+ QueryArgs<R> *args = (QueryArgs<R> *) arguments;
+
+ auto buffer = (Buffer *) args->buffer;
+ auto vers = (Structure *) args->version;
+ void *parms = args->query_parms;
+
+ // Get the buffer query state
+ auto buffer_state = Q::get_buffer_query_state(buffer, parms);
+
+ // Get the shard query states
+ std::vector<std::pair<ShardID, Shard*>> shards;
+ std::vector<void*> states;
+
+ for (auto &level : vers->get_levels()) {
+ level->get_query_states(shards, states, parms);
+ }
+
+ Q::process_query_states(parms, states, buffer_state);
+
+ std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
+
+ // Execute the query for the buffer
+ auto buffer_results = Q::buffer_query(buffer, buffer_state, parms);
+ query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers));
+ if constexpr (Q::EARLY_ABORT) {
+ if (query_results[0].size() > 0) {
+ auto result = Q::merge(query_results, parms);
+ for (size_t i=0; i<states.size(); i++) {
+ Q::delete_query_state(states[i]);
+ }
+
+ Q::delete_buffer_query_state(buffer_state);
+ return result;
+ }
+ }
+
+ // Execute the query for each shard
+ for (size_t i=0; i<shards.size(); i++) {
+ auto shard_results = Q::query(shards[i].second, states[i], parms);
+ query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer, vers));
+ if constexpr (Q::EARLY_ABORT) {
+ if (query_results[i].size() > 0) {
+ auto result = Q::merge(query_results, parms);
+ for (size_t i=0; i<states.size(); i++) {
+ Q::delete_query_state(states[i]);
+ }
+
+ Q::delete_buffer_query_state(buffer_state);
+
+ return result;
+ }
+ }
+ }
+
+ // Merge the results together
+ auto result = Q::merge(query_results, parms);
+
+ for (size_t i=0; i<states.size(); i++) {
+ Q::delete_query_state(states[i]);
+ }
+
+ Q::delete_buffer_query_state(buffer_state);
+
+ args->result_set.set_value(std::move(result));
+ delete args;
+ }
+
+ std::future<bool> schedule_merge(Structure *version, Buffer *buffer) {
+ MergeArgs *args = new MergeArgs();
+ args->merges = version->get_merge_tasks(buffer->get_record_count());
+ args->buffer = buffer;
+ args->version = version;
+
+ m_sched.schedule_job(merge, 0, args);
+
+ return args->result.get_future();
+ }
+
+ std::future<std::vector<R>> schedule_query(Structure *version, Buffer *buffer, void *query_parms) {
+ QueryArgs<R> *args = new QueryArgs<R>();
+ args->buffer = buffer;
+ args->version = version;
+ args->buffer = query_parms;
+
+ m_sched.schedule_job(async_query, 0, args);
+
+ return args->result_set.get_future();
+ }
+
int internal_append(const R &rec, bool ts) {
Buffer *buffer;
while (!(buffer = get_buffer()))
@@ -246,13 +293,15 @@ private:
if (buffer->is_full()) {
auto vers = get_active_version();
- m_sched.schedule_merge(vers, buffer);
+ auto res = schedule_merge(vers, buffer);
+ res.get();
}
+
return buffer->append(rec, ts);
}
- std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer, Structure *vers) {
+ static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer, Structure *vers) {
if constexpr (!Q::SKIP_DELETE_FILTER) {
return records;
}
@@ -303,10 +352,5 @@ private:
return processed_records;
}
};
-
-template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING>
-static void de_merge_callback(DynamicExtension<R, S, Q, L, D> extension, ExtensionStructure<R, S, Q> new_version) {
-
-}
}
diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h
index 1445e90..e8ffd08 100644
--- a/include/framework/interface/Scheduler.h
+++ b/include/framework/interface/Scheduler.h
@@ -12,20 +12,11 @@
#include <concepts>
#include "framework/interface/Record.h"
#include "util/types.h"
+#include "framework/scheduling/Task.h"
template <typename S>
-concept SchedulerInterface = requires(S s, size_t i, void *vp) {
+concept SchedulerInterface = requires(S s, size_t i, void *vp, de::Job j) {
{S(i, i)};
-// {s.schedule_merge(vp, vp)};
-
-/*
- {q.get_query_state(p, p)} -> std::convertible_to<void*>;
- {q.get_buffer_query_state(p, p)};
- {q.query(p, p)};
- {q.buffer_query(p, p)};
- {q.merge()};
- {q.delete_query_state(p)};
-*/
- //{Q::get_query_state(p, p)} -> std::convertible_to<void*>;
- //{Q::get_buffer_query_state(p, p)} -> std::convertible_to<void*>;
+ {s.schedule_job(j, i, vp)} -> std::convertible_to<void>;
+ {s.shutdown()};
};
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
index c43e930..5d6e5c2 100644
--- a/include/framework/scheduling/SerialScheduler.h
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -29,198 +29,67 @@
namespace de {
-template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
class SerialScheduler {
- typedef ExtensionStructure<R, S, Q, L> Structure;
- typedef MutableBuffer<R> Buffer;
public:
- /*
- * A simple "scheduler" that runs tasks serially, in a FIFO manner. Incoming concurrent
- * requests will wait for their turn, and only one task will be active in the system at
- * a time. The scheduler will spin up a second thread for running itself, but all tasks
- * will be single-threaded.
- *
- * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means
- * unlimited.
- *
- * Note that the SerialScheduler object is non-concurrent, and so will ignore the
- * thread_cnt argument. It will obey the memory_budget, however a failure due to
- * memory constraints will be irrecoverable, as there is no way to free up memory
- * or block particular tasks until memory becomes available.
- */
- SerialScheduler(size_t memory_budget, size_t thread_cnt)
- : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
- , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX)
- , m_used_memory(0)
- , m_used_threads(0)
- , m_shutdown(false)
- {
- m_sched_thrd = std::thread(&SerialScheduler::run_scheduler, this);
+ SerialScheduler(size_t memory_budget, size_t thread_cnt)
+ : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
+ , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX)
+ , m_used_memory(0)
+ , m_used_thrds(0)
+ , m_shutdown(false)
+ {
+ m_sched_thrd = std::thread(&SerialScheduler::run, this);
}
~SerialScheduler() {
- m_shutdown = true;
+ shutdown();
m_cv.notify_all();
m_sched_thrd.join();
}
- bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) {
- pending_version = version;
- pending_buffer = buffer;
-
- /*
- * Get list of individual level reconstructions that are necessary
- * for completing the overall merge
- */
- std::vector<MergeTask> merges = version->get_merge_tasks(buffer->get_record_count());
-
- /*
- * Schedule the merge tasks
- */
- for (ssize_t i=0; i<merges.size(); i++) {
- merges[i].m_timestamp = m_timestamp.fetch_add(1);
- m_task_queue.push(merges[i]);
- }
-
- auto t = MergeTask(-1, 0, buffer->get_record_count() * sizeof(R) * 2, m_timestamp.fetch_add(1));
- m_task_queue.push(t);
-
- m_cv.notify_all();
- do {
- std::unique_lock<std::mutex> merge_cv_lock(m_merge_cv_lock);
- m_merge_cv.wait(merge_cv_lock);
- } while (m_task_queue.size() > 0);
-
- assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0);
-
- return true;
+ void schedule_job(std::function<void(void*)> job, size_t size, void *args) {
+ size_t ts = m_counter.fetch_add(1);
+ m_task_queue.push(Task(size, ts, job, args));
}
- bool schedule_query() {
- return true;
+ void shutdown() {
+ m_shutdown = true;
}
private:
- size_t get_timestamp() {
- auto ts = m_timestamp.fetch_add(1);
- return ts;
- }
-
- void schedule_merge(MergeTask task) {
- if (task.m_source_level == -1 && task.m_target_level == 0) {
- run_buffer_merge(pending_buffer, pending_version);
- } else {
- run_merge(task, pending_version);
- }
- }
-
-
- void schedule_query(QueryTask task) {
-
- }
-
- void schedule_next_task() {
- auto task = m_task_queue.pop();
-
- auto type = std::visit(GetTaskType{}, task);
-
- switch (type) {
- case TaskType::MERGE:
- schedule_merge(std::get<MergeTask>(task));
- break;
- case TaskType::QUERY:
- schedule_query(std::get<QueryTask>(task));
- break;
- default: assert(false);
- }
-
- if (m_task_queue.size() == 0) {
- m_merge_cv.notify_all();
- }
- }
+ psudb::LockedPriorityQueue<Task> m_task_queue;
+ size_t m_memory_budget;
+ size_t m_thrd_cnt;
- void run_merge(MergeTask task, Structure *version) {
- version->merge_levels(task.m_target_level, task.m_source_level);
-
- if (!version->validate_tombstone_proportion(task.m_target_level)) {
- auto tasks = version->get_merge_tasks(task.m_target_level);
- /*
- * Schedule the merge tasks
- */
- std::promise<void> trigger_prom;
- tasks[tasks.size() - 1].make_dependent_on(trigger_prom);
- tasks[tasks.size() - 1].m_timestamp = m_timestamp.fetch_add(1);
- m_task_queue.push(tasks[tasks.size() - 1]);
-
- for (ssize_t i=tasks.size()-2; i>=0; i--) {
- tasks[i].make_dependent_on(tasks[i+1]);
- tasks[i].m_timestamp = m_timestamp.fetch_add(1);
- m_task_queue.push(tasks[i]);
- }
+ bool m_shutdown;
- /*
- * Block the completion of any task until all have been
- * scheduled. Probably not strictly necessary, but due to
- * interface constraints with the way promises are used,
- * a dummy promise needs to be set up for the first job
- * anyway. It's easiest to just release it here.
- */
- trigger_prom.set_value();
- }
- }
+ std::atomic<size_t> m_counter;
+ std::mutex m_cv_lock;
+ std::condition_variable m_cv;
+ std::thread m_sched_thrd;
- void run_buffer_merge(Buffer *buffer, Structure *version) {
- version->merge_buffer(buffer);
- if (!version->validate_tombstone_proportion(0)) {
- auto tasks = version->get_merge_tasks_from_level(0);
+ std::atomic<size_t> m_used_thrds;
+ std::atomic<size_t> m_used_memory;
- /*
- * Schedule the merge tasks (FIXME: currently this just
- * executes them sequentially in a blocking fashion)
- */
- for (ssize_t i=tasks.size()-1; i>=0; i--) {
- tasks[i].m_timestamp = m_timestamp.fetch_add(1);
- m_task_queue.push(tasks[i]);
- }
- }
+ void schedule_next() {
+ auto t = m_task_queue.pop();
+ t();
}
- void run_scheduler() {
+ void run() {
do {
std::unique_lock<std::mutex> cv_lock(m_cv_lock);
m_cv.wait(cv_lock);
- while (m_task_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) {
- schedule_next_task();
+ while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) {
+ schedule_next();
}
cv_lock.unlock();
} while(!m_shutdown);
}
-
- size_t m_memory_budget;
- size_t m_thread_cnt;
-
- Buffer *pending_buffer;
- Structure *pending_version;
-
- alignas(64) std::atomic<size_t> m_used_memory;
- alignas(64) std::atomic<size_t> m_used_threads;
- alignas(64) std::atomic<size_t> m_timestamp;
-
- psudb::LockedPriorityQueue<Task, std::vector<Task>, std::greater<Task>> m_task_queue;
-
- std::mutex m_cv_lock;
- std::condition_variable m_cv;
-
- std::mutex m_merge_cv_lock;
- std::condition_variable m_merge_cv;
-
- std::thread m_sched_thrd;
-
- bool m_shutdown;
};
}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 3c1b158..518159d 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -5,111 +5,53 @@
#include <variant>
#include <future>
+#include <functional>
#include "framework/util/Configuration.h"
namespace de {
-enum class TaskType {
- MERGE,
- QUERY
+struct MergeArgs {
+ void *version;
+ void *buffer;
+ std::vector<MergeTask> merges;
+ std::promise<bool> result;
};
-struct TaskDependency {
- std::promise<void> prom;
- std::future<void> fut;
+template <typename R>
+struct QueryArgs {
+ void *version;
+ void *buffer;
+ std::promise<std::vector<R>> result_set;
+ void *query_parms;
};
-struct MergeTask {
- level_index m_source_level;
- level_index m_target_level;
- size_t m_timestamp;
- size_t m_size;
- TaskType m_type;
- std::unique_ptr<TaskDependency> m_dep;
-
- MergeTask() = default;
-
- MergeTask(level_index source, level_index target, size_t size, size_t timestamp)
- : m_source_level(source)
- , m_target_level(target)
- , m_timestamp(timestamp)
- , m_size(size)
- , m_type(TaskType::MERGE)
- , m_dep(std::make_unique<TaskDependency>()){}
-
+typedef std::function<void(void*)> Job;
- MergeTask(MergeTask &t)
- : m_source_level(t.m_source_level)
- , m_target_level(t.m_target_level)
- , m_timestamp(t.m_timestamp)
- , m_size(t.m_size)
- , m_type(TaskType::MERGE)
- , m_dep(std::move(t.m_dep))
+struct Task {
+ Task(size_t size, size_t ts, Job job, void *args)
+ : m_job(job)
+ , m_size(size)
+ , m_timestamp(ts)
+ , m_args(args)
{}
-
- TaskType get_type() const {
- return m_type;
- }
-
- void make_dependent_on(MergeTask &task) {
- m_dep->fut = task.m_dep->prom.get_future();
- }
-
- void make_dependent_on(TaskDependency *dep) {
- m_dep->fut = dep->prom.get_future();
- }
-
- friend bool operator<(const MergeTask &self, const MergeTask &other) {
- return self.m_timestamp < other.m_timestamp;
- }
-
- friend bool operator>(const MergeTask &self, const MergeTask &other) {
- return self.m_timestamp > other.m_timestamp;
- }
-
-};
-
-struct QueryTask {
- size_t m_timestamp;
+ Job m_job;
size_t m_size;
- TaskType m_type;
- std::unique_ptr<TaskDependency> m_dep;
-
- QueryTask(QueryTask &t)
- : m_timestamp(t.m_timestamp)
- , m_size(t.m_size)
- , m_type(t.m_type)
- , m_dep(std::move(t.m_dep))
- {}
-
- TaskType get_type() const {
- return m_type;
- }
-
- void SetDependency(QueryTask &task) {
- m_dep->fut = task.m_dep->prom.get_future();
- }
-
- void SetDependency(TaskDependency *dep) {
- m_dep->fut = dep->prom.get_future();
- }
+ size_t m_timestamp;
+ void *m_args;
- friend bool operator<(const QueryTask &self, const QueryTask &other) {
+ friend bool operator<(const Task &self, const Task &other) {
return self.m_timestamp < other.m_timestamp;
}
- friend bool operator>(const QueryTask &self, const QueryTask &other) {
+ friend bool operator>(const Task &self, const Task &other) {
return self.m_timestamp > other.m_timestamp;
}
-};
-struct GetTaskType {
- TaskType operator()(const MergeTask &t) { return t.get_type(); }
- TaskType operator()(const QueryTask &t) { return t.get_type(); }
+ void operator()() {
+ m_job(m_args);
+ }
};
-typedef std::variant<MergeTask, QueryTask> Task;
-
}
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 920e1c3..8344518 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -93,7 +93,10 @@ public:
inline bool merge_buffer(Buffer *buffer) {
assert(can_merge_with(0, buffer->get_record_count()));
+ buffer->start_merge();
merge_buffer_into_l0(buffer);
+ buffer->finish_merge();
+
buffer->truncate();
return true;
@@ -216,10 +219,7 @@ public:
}
for (level_index i=merge_base_level; i>0; i--) {
- MergeTask task;
- task.m_source_level = i - 1;
- task.m_target_level = i;
- task.m_type = TaskType::MERGE;
+ MergeTask task = {i-1, i};
/*
* The amount of storage required for the merge accounts
@@ -237,7 +237,7 @@ public:
reccnt += m_levels[i]->get_record_count();
}
}
- task.m_size = 2* reccnt * sizeof(R);
+ //task.m_size = 2* reccnt * sizeof(R);
merges.push_back(task);
}
@@ -249,7 +249,7 @@ public:
/*
*
*/
- std::vector<MergeTask> get_merge_tasks_from_level(size_t source_level) {
+ std::vector<MergeTask> get_merge_tasks_from_level(level_index source_level) {
std::vector<MergeTask> merges;
level_index merge_base_level = find_mergable_level(source_level);
@@ -258,10 +258,7 @@ public:
}
for (level_index i=merge_base_level; i>source_level; i--) {
- MergeTask task;
- task.m_source_level = i - 1;
- task.m_target_level = i;
-
+ MergeTask task = {i - 1, i};
/*
* The amount of storage required for the merge accounts
* for the cost of storing the new records, along with the
@@ -278,12 +275,12 @@ public:
reccnt += m_levels[i]->get_record_count();
}
}
- task.m_size = 2* reccnt * sizeof(R);
+// task.m_size = 2* reccnt * sizeof(R);
merges.push_back(task);
}
- return std::move(merges);
+ return merges;
}
/*
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 9f12175..804ca5e 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -90,12 +90,23 @@ public:
}
bool truncate() {
+
+ while (active_merge() || m_refcnt.load() > 0)
+ ;
+
+ m_merge_lock.lock();
+
+ while (m_refcnt > 0)
+ ;
+
m_tombstonecnt.store(0);
m_reccnt.store(0);
m_weight.store(0);
m_max_weight.store(0);
if (m_tombstone_filter) m_tombstone_filter->clear();
+ m_merge_lock.unlock();
+
return true;
}
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index eb9b93f..9d8248f 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -50,5 +50,6 @@ enum class DeletePolicy {
};
typedef ssize_t level_index;
+typedef std::pair<level_index, level_index> MergeTask;
}