diff options
| -rw-r--r-- | include/framework/DynamicExtension.h | 5 | ||||
| -rw-r--r-- | include/framework/ExtensionStructure.h | 109 | ||||
| -rw-r--r-- | include/framework/InternalLevel.h | 39 | ||||
| -rw-r--r-- | include/framework/Scheduler.h | 126 |
4 files changed, 221 insertions, 58 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 08e2243..6965965 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -301,5 +301,10 @@ private: return processed_records; } }; + +template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING> +static void de_merge_callback(DynamicExtension<R, S, Q, L, D> extension, ExtensionStructure<R, S, Q> new_version) { + +} } diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h index 9a5f6b3..2fb9cf0 100644 --- a/include/framework/ExtensionStructure.h +++ b/include/framework/ExtensionStructure.h @@ -33,8 +33,12 @@ struct MergeTask { size_t m_size; size_t m_timestamp; - bool operator<(MergeTask &other) { - return m_timestamp < other.m_timestamp; + friend bool operator<(const MergeTask &self, const MergeTask &other) { + return self.m_timestamp < other.m_timestamp; + } + + friend bool operator>(const MergeTask &self, const MergeTask &other) { + return self.m_timestamp > other.m_timestamp; } }; @@ -66,7 +70,7 @@ public: ExtensionStructure<R, S, Q, L> *copy() { auto new_struct = new ExtensionStructure<R, S, Q, L>(m_scale_factor, m_max_delete_prop, m_buffer_size); for (size_t i=0; i<m_levels.size(); i++) { - new_struct->m_levels.push_back(m_levels[i]); + new_struct->m_levels.push_back(m_levels[i]->clone()); } return new_struct; @@ -104,9 +108,8 @@ public: assert(can_merge_with(0, buffer->get_record_count())); merge_buffer_into_l0(buffer); - enforce_delete_maximum(0); - buffer->truncate(); + return true; } @@ -193,6 +196,11 @@ public: return true; } + bool validate_tombstone_proportion(level_index level) { + long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level); + return ts_prop <= (long double) m_max_delete_prop; + } + /* * Return a reference to the underlying vector of levels within the * structure. @@ -250,6 +258,47 @@ public: return std::move(merges); } + + /* + * + */ + std::vector<MergeTask> get_merge_tasks_from_level(size_t source_level) { + std::vector<MergeTask> merges; + + level_index merge_base_level = find_mergable_level(source_level); + if (merge_base_level == -1) { + merge_base_level = grow(); + } + + for (level_index i=merge_base_level; i>source_level; i--) { + MergeTask task; + task.m_source_level = i - 1; + task.m_target_level = i; + + /* + * The amount of storage required for the merge accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i-1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_merge_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } + task.m_size = 2* reccnt * sizeof(R); + + merges.push_back(task); + } + + return std::move(merges); + } + /* * Merge the level specified by incoming level into the level specified * by base level. The two levels should be sequential--i.e. no levels @@ -282,44 +331,18 @@ private: * and act appropriately. */ inline level_index grow() { - level_index new_idx; - + level_index new_idx = m_levels.size(); size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - new_idx = m_levels.size(); - if (new_idx > 0) { - assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); - } - m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt))); + m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt))); return new_idx; } - - /* - * Merge the specified level down into the tree. The level index must be - * non-negative (i.e., this function cannot be used to merge the buffer). This - * routine will recursively perform any necessary merges to make room for the - * specified level. - */ - inline void merge_down(level_index idx) { - level_index merge_base_level = find_mergable_level(idx); - if (merge_base_level == -1) { - merge_base_level = grow(); - } - - for (level_index i=merge_base_level; i>idx; i--) { - merge_levels(i, i-1); - enforce_delete_maximum(i); - } - - return; - } - /* * Find the first level below the level indicated by idx that * is capable of sustaining a merge operation and return its * level index. If no such level exists, returns -1. Also - * returns -1 if idx==0, and no such level exists, to simplify + * returns -1 if idx==0, and no such level exists, to skimplify * the logic of the first merge. */ inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { @@ -342,7 +365,6 @@ private: return -1; } - inline void merge_buffer_into_l0(Buffer *buffer) { assert(m_levels[0]); if constexpr (L == LayoutPolicy::LEVELING) { @@ -370,21 +392,6 @@ private: } /* - * Check the tombstone proportion for the specified level and - * if the limit is exceeded, forcibly merge levels until all - * levels below idx are below the limit. - */ - inline void enforce_delete_maximum(level_index idx) { - long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx); - - if (ts_prop > (long double) m_max_delete_prop) { - merge_down(idx); - } - - return; - } - - /* * Assume that level "0" should be larger than the buffer. The buffer * itself is index -1, which should return simply the buffer capacity. */ @@ -424,7 +431,7 @@ private: return m_levels[idx]->get_shard_count() < m_scale_factor; } - // unreachable + /* unreachable */ assert(true); } }; diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index b9866b8..e67ae45 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -34,6 +34,7 @@ public: , m_shard_cnt(0) , m_shards(shard_cap, nullptr) , m_owns(shard_cap, true) + , m_pending_shard(nullptr) {} // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 @@ -42,7 +43,9 @@ public: : m_level_no(level->m_level_no + 1) , m_shard_cnt(level->m_shard_cnt) , m_shards(level->m_shards.size(), nullptr) - , m_owns(level->m_owns.size(), true) { + , m_owns(level->m_owns.size(), true) + , m_pending_shard(nullptr) + { assert(m_shard_cnt == 1 && m_shards.size() == 1); for (size_t i=0; i<m_shards.size(); i++) { @@ -55,6 +58,8 @@ public: for (size_t i=0; i<m_shards.size(); i++) { if (m_owns[i]) delete m_shards[i]; } + + delete m_pending_shard; } // WARNING: for leveling only. @@ -72,20 +77,45 @@ public: } void append_buffer(Buffer* buffer) { - assert(m_shard_cnt < m_shards.size()); + if (m_shard_cnt == m_shards.size()) { + assert(m_pending_shard == nullptr); + m_pending_shard = new S(buffer); + return; + } + m_shards[m_shard_cnt] = new S(buffer); m_owns[m_shard_cnt] = true; ++m_shard_cnt; } void append_merged_shards(InternalLevel* level) { - assert(m_shard_cnt < m_shards.size()); + if (m_shard_cnt == m_shards.size()) { + m_pending_shard = new S(level->m_shards.data(), level->m_shard_cnt); + return; + } + m_shards[m_shard_cnt] = new S(level->m_shards.data(), level->m_shard_cnt); m_owns[m_shard_cnt] = true; ++m_shard_cnt; } + + void finalize() { + if (m_pending_shard) { + for (size_t i=0; i<m_shards.size(); i++) { + if (m_owns[i]) { + delete m_shards[i]; + m_owns[i] = false; + } + } + + m_shards[0] = m_pending_shard; + m_owns[0] = true; + m_pending_shard = nullptr; + } + } + Shard *get_merged_shard() { if (m_shard_cnt == 0) { return nullptr; @@ -206,6 +236,9 @@ private: size_t m_shard_size_cap; std::vector<Shard*> m_shards; + + Shard *m_pending_shard; + std::vector<bool> m_owns; InternalLevel *clone() { diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h index 28ed8a9..534ce25 100644 --- a/include/framework/Scheduler.h +++ b/include/framework/Scheduler.h @@ -12,6 +12,8 @@ #include <vector> #include <memory> #include <queue> +#include <thread> +#include <condition_variable> #include "util/types.h" #include "framework/ShardInterface.h" @@ -26,6 +28,7 @@ namespace de { template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L> class Scheduler { typedef ExtensionStructure<R, S, Q, L> Structure; + typedef MutableBuffer<R> Buffer; public: /* * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means @@ -36,10 +39,26 @@ public: , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX) , m_used_memory(0) , m_used_threads(0) - {} + , m_shutdown(false) + { + m_sched_thrd = std::thread(&Scheduler::run_scheduler, this); + } + + ~Scheduler() { + m_shutdown = true; + + m_cv.notify_all(); + m_sched_thrd.join(); + } bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) { /* + * temporary hack + */ + pending_version = version; + pending_buffer = buffer; + + /* * Get list of individual level reconstructions that are necessary * for completing the overall merge */ @@ -50,10 +69,30 @@ public: * executes them sequentially in a blocking fashion) */ for (ssize_t i=merges.size()-1; i>=0; i--) { - version->merge_levels(merges[i].m_target_level, merges[i].m_source_level); + merges[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(merges[i]); + m_merge_queue_lock.unlock(); } - return version->merge_buffer(buffer); + MergeTask buffer_merge; + buffer_merge.m_source_level = -1; + buffer_merge.m_target_level = 0; + buffer_merge.m_size = buffer->get_record_count() * sizeof(R) * 2; + buffer_merge.m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(buffer_merge); + m_merge_queue_lock.unlock(); + + m_cv.notify_all(); + do { + std::unique_lock<std::mutex> merge_cv_lock(m_merge_cv_lock); + m_merge_cv.wait(merge_cv_lock); + } while (m_merge_queue.size() > 0); + + assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0); + + return true; } private: @@ -62,15 +101,94 @@ private: return ts; } + void schedule_next_task() { + m_merge_queue_lock.lock(); + auto task = m_merge_queue.top(); + m_merge_queue.pop(); + m_merge_queue_lock.unlock(); + + if (task.m_source_level == -1 && task.m_target_level == 0) { + run_buffer_merge(pending_buffer, pending_version); + } else { + run_merge(task, pending_version); + } + + if (m_merge_queue.size() == 0) { + m_merge_cv.notify_all(); + } + } + + void run_merge(MergeTask task, Structure *version) { + version->merge_levels(task.m_target_level, task.m_source_level); + if (!version->validate_tombstone_proportion(task.m_target_level)) { + auto tasks = version->get_merge_tasks(task.m_target_level); + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=tasks.size()-1; i>=0; i--) { + tasks[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(tasks[i]); + m_merge_queue_lock.unlock(); + } + } + } + + + void run_buffer_merge(Buffer *buffer, Structure *version) { + version->merge_buffer(buffer); + if (!version->validate_tombstone_proportion(0)) { + auto tasks = version->get_merge_tasks_from_level(0); + + /* + * Schedule the merge tasks (FIXME: currently this just + * executes them sequentially in a blocking fashion) + */ + for (ssize_t i=tasks.size()-1; i>=0; i--) { + tasks[i].m_timestamp = m_timestamp.fetch_add(1); + m_merge_queue_lock.lock(); + m_merge_queue.push(tasks[i]); + m_merge_queue_lock.unlock(); + } + } + } + + void run_scheduler() { + do { + std::unique_lock<std::mutex> cv_lock(m_cv_lock); + m_cv.wait(cv_lock); + + while (m_merge_queue.size() > 0 && m_used_threads < m_thread_cnt) { + schedule_next_task(); + } + cv_lock.unlock(); + } while(!m_shutdown); + } + size_t m_memory_budget; size_t m_thread_cnt; + Buffer *pending_buffer; + Structure *pending_version; + alignas(64) std::atomic<size_t> m_used_memory; alignas(64) std::atomic<size_t> m_used_threads; alignas(64) std::atomic<size_t> m_timestamp; - std::priority_queue<MergeTask> m_merge_queue; + std::priority_queue<MergeTask, std::vector<MergeTask>, std::greater<MergeTask>> m_merge_queue; std::mutex m_merge_queue_lock; + + std::mutex m_cv_lock; + std::condition_variable m_cv; + + std::mutex m_merge_cv_lock; + std::condition_variable m_merge_cv; + + std::thread m_sched_thrd; + + bool m_shutdown; + }; } |