summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h5
-rw-r--r--include/framework/ExtensionStructure.h109
-rw-r--r--include/framework/InternalLevel.h39
-rw-r--r--include/framework/Scheduler.h126
4 files changed, 221 insertions, 58 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 08e2243..6965965 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -301,5 +301,10 @@ private:
return processed_records;
}
};
+
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING>
+static void de_merge_callback(DynamicExtension<R, S, Q, L, D> extension, ExtensionStructure<R, S, Q> new_version) {
+
+}
}
diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h
index 9a5f6b3..2fb9cf0 100644
--- a/include/framework/ExtensionStructure.h
+++ b/include/framework/ExtensionStructure.h
@@ -33,8 +33,12 @@ struct MergeTask {
size_t m_size;
size_t m_timestamp;
- bool operator<(MergeTask &other) {
- return m_timestamp < other.m_timestamp;
+ friend bool operator<(const MergeTask &self, const MergeTask &other) {
+ return self.m_timestamp < other.m_timestamp;
+ }
+
+ friend bool operator>(const MergeTask &self, const MergeTask &other) {
+ return self.m_timestamp > other.m_timestamp;
}
};
@@ -66,7 +70,7 @@ public:
ExtensionStructure<R, S, Q, L> *copy() {
auto new_struct = new ExtensionStructure<R, S, Q, L>(m_scale_factor, m_max_delete_prop, m_buffer_size);
for (size_t i=0; i<m_levels.size(); i++) {
- new_struct->m_levels.push_back(m_levels[i]);
+ new_struct->m_levels.push_back(m_levels[i]->clone());
}
return new_struct;
@@ -104,9 +108,8 @@ public:
assert(can_merge_with(0, buffer->get_record_count()));
merge_buffer_into_l0(buffer);
- enforce_delete_maximum(0);
-
buffer->truncate();
+
return true;
}
@@ -193,6 +196,11 @@ public:
return true;
}
+ bool validate_tombstone_proportion(level_index level) {
+ long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level);
+ return ts_prop <= (long double) m_max_delete_prop;
+ }
+
/*
* Return a reference to the underlying vector of levels within the
* structure.
@@ -250,6 +258,47 @@ public:
return std::move(merges);
}
+
+ /*
+ *
+ */
+ std::vector<MergeTask> get_merge_tasks_from_level(size_t source_level) {
+ std::vector<MergeTask> merges;
+
+ level_index merge_base_level = find_mergable_level(source_level);
+ if (merge_base_level == -1) {
+ merge_base_level = grow();
+ }
+
+ for (level_index i=merge_base_level; i>source_level; i--) {
+ MergeTask task;
+ task.m_source_level = i - 1;
+ task.m_target_level = i;
+
+ /*
+ * The amount of storage required for the merge accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i-1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_merge_with(i, reccnt)) {
+ reccnt += m_levels[i]->get_record_count();
+ }
+ }
+ task.m_size = 2* reccnt * sizeof(R);
+
+ merges.push_back(task);
+ }
+
+ return std::move(merges);
+ }
+
/*
* Merge the level specified by incoming level into the level specified
* by base level. The two levels should be sequential--i.e. no levels
@@ -282,44 +331,18 @@ private:
* and act appropriately.
*/
inline level_index grow() {
- level_index new_idx;
-
+ level_index new_idx = m_levels.size();
size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
- new_idx = m_levels.size();
- if (new_idx > 0) {
- assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
- }
- m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)));
+ m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)));
return new_idx;
}
-
- /*
- * Merge the specified level down into the tree. The level index must be
- * non-negative (i.e., this function cannot be used to merge the buffer). This
- * routine will recursively perform any necessary merges to make room for the
- * specified level.
- */
- inline void merge_down(level_index idx) {
- level_index merge_base_level = find_mergable_level(idx);
- if (merge_base_level == -1) {
- merge_base_level = grow();
- }
-
- for (level_index i=merge_base_level; i>idx; i--) {
- merge_levels(i, i-1);
- enforce_delete_maximum(i);
- }
-
- return;
- }
-
/*
* Find the first level below the level indicated by idx that
* is capable of sustaining a merge operation and return its
* level index. If no such level exists, returns -1. Also
- * returns -1 if idx==0, and no such level exists, to simplify
+ * returns -1 if idx==0, and no such level exists, to skimplify
* the logic of the first merge.
*/
inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) {
@@ -342,7 +365,6 @@ private:
return -1;
}
-
inline void merge_buffer_into_l0(Buffer *buffer) {
assert(m_levels[0]);
if constexpr (L == LayoutPolicy::LEVELING) {
@@ -370,21 +392,6 @@ private:
}
/*
- * Check the tombstone proportion for the specified level and
- * if the limit is exceeded, forcibly merge levels until all
- * levels below idx are below the limit.
- */
- inline void enforce_delete_maximum(level_index idx) {
- long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx);
-
- if (ts_prop > (long double) m_max_delete_prop) {
- merge_down(idx);
- }
-
- return;
- }
-
- /*
* Assume that level "0" should be larger than the buffer. The buffer
* itself is index -1, which should return simply the buffer capacity.
*/
@@ -424,7 +431,7 @@ private:
return m_levels[idx]->get_shard_count() < m_scale_factor;
}
- // unreachable
+ /* unreachable */
assert(true);
}
};
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index b9866b8..e67ae45 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -34,6 +34,7 @@ public:
, m_shard_cnt(0)
, m_shards(shard_cap, nullptr)
, m_owns(shard_cap, true)
+ , m_pending_shard(nullptr)
{}
// Create a new memory level sharing the shards and repurposing it as previous level_no + 1
@@ -42,7 +43,9 @@ public:
: m_level_no(level->m_level_no + 1)
, m_shard_cnt(level->m_shard_cnt)
, m_shards(level->m_shards.size(), nullptr)
- , m_owns(level->m_owns.size(), true) {
+ , m_owns(level->m_owns.size(), true)
+ , m_pending_shard(nullptr)
+ {
assert(m_shard_cnt == 1 && m_shards.size() == 1);
for (size_t i=0; i<m_shards.size(); i++) {
@@ -55,6 +58,8 @@ public:
for (size_t i=0; i<m_shards.size(); i++) {
if (m_owns[i]) delete m_shards[i];
}
+
+ delete m_pending_shard;
}
// WARNING: for leveling only.
@@ -72,20 +77,45 @@ public:
}
void append_buffer(Buffer* buffer) {
- assert(m_shard_cnt < m_shards.size());
+ if (m_shard_cnt == m_shards.size()) {
+ assert(m_pending_shard == nullptr);
+ m_pending_shard = new S(buffer);
+ return;
+ }
+
m_shards[m_shard_cnt] = new S(buffer);
m_owns[m_shard_cnt] = true;
++m_shard_cnt;
}
void append_merged_shards(InternalLevel* level) {
- assert(m_shard_cnt < m_shards.size());
+ if (m_shard_cnt == m_shards.size()) {
+ m_pending_shard = new S(level->m_shards.data(), level->m_shard_cnt);
+ return;
+ }
+
m_shards[m_shard_cnt] = new S(level->m_shards.data(), level->m_shard_cnt);
m_owns[m_shard_cnt] = true;
++m_shard_cnt;
}
+
+ void finalize() {
+ if (m_pending_shard) {
+ for (size_t i=0; i<m_shards.size(); i++) {
+ if (m_owns[i]) {
+ delete m_shards[i];
+ m_owns[i] = false;
+ }
+ }
+
+ m_shards[0] = m_pending_shard;
+ m_owns[0] = true;
+ m_pending_shard = nullptr;
+ }
+ }
+
Shard *get_merged_shard() {
if (m_shard_cnt == 0) {
return nullptr;
@@ -206,6 +236,9 @@ private:
size_t m_shard_size_cap;
std::vector<Shard*> m_shards;
+
+ Shard *m_pending_shard;
+
std::vector<bool> m_owns;
InternalLevel *clone() {
diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h
index 28ed8a9..534ce25 100644
--- a/include/framework/Scheduler.h
+++ b/include/framework/Scheduler.h
@@ -12,6 +12,8 @@
#include <vector>
#include <memory>
#include <queue>
+#include <thread>
+#include <condition_variable>
#include "util/types.h"
#include "framework/ShardInterface.h"
@@ -26,6 +28,7 @@ namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
class Scheduler {
typedef ExtensionStructure<R, S, Q, L> Structure;
+ typedef MutableBuffer<R> Buffer;
public:
/*
* Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means
@@ -36,10 +39,26 @@ public:
, m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX)
, m_used_memory(0)
, m_used_threads(0)
- {}
+ , m_shutdown(false)
+ {
+ m_sched_thrd = std::thread(&Scheduler::run_scheduler, this);
+ }
+
+ ~Scheduler() {
+ m_shutdown = true;
+
+ m_cv.notify_all();
+ m_sched_thrd.join();
+ }
bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) {
/*
+ * temporary hack
+ */
+ pending_version = version;
+ pending_buffer = buffer;
+
+ /*
* Get list of individual level reconstructions that are necessary
* for completing the overall merge
*/
@@ -50,10 +69,30 @@ public:
* executes them sequentially in a blocking fashion)
*/
for (ssize_t i=merges.size()-1; i>=0; i--) {
- version->merge_levels(merges[i].m_target_level, merges[i].m_source_level);
+ merges[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.push(merges[i]);
+ m_merge_queue_lock.unlock();
}
- return version->merge_buffer(buffer);
+ MergeTask buffer_merge;
+ buffer_merge.m_source_level = -1;
+ buffer_merge.m_target_level = 0;
+ buffer_merge.m_size = buffer->get_record_count() * sizeof(R) * 2;
+ buffer_merge.m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.push(buffer_merge);
+ m_merge_queue_lock.unlock();
+
+ m_cv.notify_all();
+ do {
+ std::unique_lock<std::mutex> merge_cv_lock(m_merge_cv_lock);
+ m_merge_cv.wait(merge_cv_lock);
+ } while (m_merge_queue.size() > 0);
+
+ assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0);
+
+ return true;
}
private:
@@ -62,15 +101,94 @@ private:
return ts;
}
+ void schedule_next_task() {
+ m_merge_queue_lock.lock();
+ auto task = m_merge_queue.top();
+ m_merge_queue.pop();
+ m_merge_queue_lock.unlock();
+
+ if (task.m_source_level == -1 && task.m_target_level == 0) {
+ run_buffer_merge(pending_buffer, pending_version);
+ } else {
+ run_merge(task, pending_version);
+ }
+
+ if (m_merge_queue.size() == 0) {
+ m_merge_cv.notify_all();
+ }
+ }
+
+ void run_merge(MergeTask task, Structure *version) {
+ version->merge_levels(task.m_target_level, task.m_source_level);
+ if (!version->validate_tombstone_proportion(task.m_target_level)) {
+ auto tasks = version->get_merge_tasks(task.m_target_level);
+ /*
+ * Schedule the merge tasks (FIXME: currently this just
+ * executes them sequentially in a blocking fashion)
+ */
+ for (ssize_t i=tasks.size()-1; i>=0; i--) {
+ tasks[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.push(tasks[i]);
+ m_merge_queue_lock.unlock();
+ }
+ }
+ }
+
+
+ void run_buffer_merge(Buffer *buffer, Structure *version) {
+ version->merge_buffer(buffer);
+ if (!version->validate_tombstone_proportion(0)) {
+ auto tasks = version->get_merge_tasks_from_level(0);
+
+ /*
+ * Schedule the merge tasks (FIXME: currently this just
+ * executes them sequentially in a blocking fashion)
+ */
+ for (ssize_t i=tasks.size()-1; i>=0; i--) {
+ tasks[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.push(tasks[i]);
+ m_merge_queue_lock.unlock();
+ }
+ }
+ }
+
+ void run_scheduler() {
+ do {
+ std::unique_lock<std::mutex> cv_lock(m_cv_lock);
+ m_cv.wait(cv_lock);
+
+ while (m_merge_queue.size() > 0 && m_used_threads < m_thread_cnt) {
+ schedule_next_task();
+ }
+ cv_lock.unlock();
+ } while(!m_shutdown);
+ }
+
size_t m_memory_budget;
size_t m_thread_cnt;
+ Buffer *pending_buffer;
+ Structure *pending_version;
+
alignas(64) std::atomic<size_t> m_used_memory;
alignas(64) std::atomic<size_t> m_used_threads;
alignas(64) std::atomic<size_t> m_timestamp;
- std::priority_queue<MergeTask> m_merge_queue;
+ std::priority_queue<MergeTask, std::vector<MergeTask>, std::greater<MergeTask>> m_merge_queue;
std::mutex m_merge_queue_lock;
+
+ std::mutex m_cv_lock;
+ std::condition_variable m_cv;
+
+ std::mutex m_merge_cv_lock;
+ std::condition_variable m_merge_cv;
+
+ std::thread m_sched_thrd;
+
+ bool m_shutdown;
+
};
}