From eb8dbaa770a57557d67c817c2839c64f536a6ce4 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 13 Sep 2023 16:22:03 -0400 Subject: Began re-architecting the project for concurrency support The project is now in a state where it builds, but it probably has a lot of bugs still. --- include/framework/DynamicExtension.h | 372 +++++++---------------------------- 1 file changed, 70 insertions(+), 302 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 524024b..5e9bcee 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -19,62 +19,36 @@ #include "framework/ShardInterface.h" #include "framework/QueryInterface.h" #include "framework/RecordInterface.h" +#include "framework/ExtensionStructure.h" + +#include "framework/Configuration.h" +#include "framework/Scheduler.h" -#include "shard/WIRS.h" #include "psu-util/timer.h" #include "psu-ds/Alias.h" namespace de { -thread_local size_t sampling_attempts = 0; -thread_local size_t sampling_rejections = 0; -thread_local size_t deletion_rejections = 0; -thread_local size_t bounds_rejections = 0; -thread_local size_t tombstone_rejections = 0; -thread_local size_t buffer_rejections = 0; - -/* - * thread_local size_t various_sampling_times go here. - */ -thread_local size_t sample_range_time = 0; -thread_local size_t alias_time = 0; -thread_local size_t alias_query_time = 0; -thread_local size_t rejection_check_time = 0; -thread_local size_t buffer_sample_time = 0; -thread_local size_t memlevel_sample_time = 0; -thread_local size_t disklevel_sample_time = 0; -thread_local size_t sampling_bailouts = 0; - - -enum class LayoutPolicy { - LEVELING, - TEIRING -}; - -enum class DeletePolicy { - TOMBSTONE, - TAGGING -}; - -typedef ssize_t level_index; - template class DynamicExtension { - //typedef typename S Shard; typedef S Shard; typedef MutableBuffer Buffer; - + typedef ExtensionStructure Structure; public: - DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) - : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop), - m_buffer(new Buffer(buffer_cap, buffer_cap * max_delete_prop)) + DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop, size_t memory_budget=0, + size_t thread_cnt=16) + : m_scale_factor(scale_factor) + , m_max_delete_prop(max_delete_prop) + , m_sched(memory_budget, thread_cnt) { } ~DynamicExtension() { - delete m_buffer; + for (size_t i=0; idelete_record(rec)) { - return 1; - } + if (get_active_version()->tagged_delete(rec)) { + return 1; } - // the buffer will take the longest amount of time, and - // probably has the lowest probability of having the record, - // so we'll check it last. + /* + * the buffer will take the longest amount of time, and + * probably has the lowest probability of having the record, + * so we'll check it last. + */ return buffer->delete_record(rec); } + /* + * If tagging isn't used, then delete using a tombstone + */ return internal_append(rec, true); } std::vector query(void *parms) { auto buffer = get_buffer(); + auto vers = get_active_version(); // Get the buffer query state auto buffer_state = Q::get_buffer_query_state(buffer, parms); @@ -115,7 +89,7 @@ public: std::vector> shards; std::vector states; - for (auto &level : m_levels) { + for (auto &level : vers->get_levels()) { level->get_query_states(shards, states, parms); } @@ -125,7 +99,7 @@ public: // Execute the query for the buffer auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); - query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer)); + query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers)); if constexpr (Q::EARLY_ABORT) { if (query_results[0].size() > 0) { auto result = Q::merge(query_results, parms); @@ -141,7 +115,7 @@ public: // Execute the query for each shard for (size_t i=0; i 0) { auto result = Q::merge(query_results, parms); @@ -170,75 +144,44 @@ public: size_t get_record_count() { size_t cnt = get_buffer()->get_record_count(); - - for (size_t i=0; iget_record_count(); - } - - return cnt; + return cnt + get_active_version()->get_record_count(); } size_t get_tombstone_cnt() { size_t cnt = get_buffer()->get_tombstone_count(); - - for (size_t i=0; iget_tombstone_count(); - } - - return cnt; + return cnt + get_active_version()->get_tombstone_cnt(); } size_t get_height() { - return m_levels.size(); + return get_active_version()->get_height(); } size_t get_memory_usage() { - size_t cnt = m_buffer->get_memory_usage(); - - for (size_t i=0; iget_memory_usage(); - } + auto vers = get_active_version(); + auto buffer = get_buffer(); - return cnt; + return vers.get_memory_usage() + buffer->get_memory_usage(); } size_t get_aux_memory_usage() { - size_t cnt = m_buffer->get_aux_memory_usage(); - - for (size_t i=0; iget_aux_memory_usage(); - } - } - - return cnt; - } - - bool validate_tombstone_proportion() { - long double ts_prop; - for (size_t i=0; iget_tombstone_count() / (long double) calc_level_record_capacity(i); - if (ts_prop > (long double) m_max_delete_prop) { - return false; - } - } - } + auto vers = get_active_version(); + auto buffer = get_buffer(); - return true; + return vers.get_aux_memory_usage() + buffer->get_aux_memory_usage(); } size_t get_buffer_capacity() { - return m_buffer->get_capacity(); + return get_height()->get_capacity(); } Shard *create_static_structure() { + auto vers = get_active_version(); std::vector shards; - if (m_levels.size() > 0) { - for (int i=m_levels.size() - 1; i>= 0; i--) { - if (m_levels[i]) { - shards.emplace_back(m_levels[i]->get_merged_shard()); + if (vers->get_levels().size() > 0) { + for (int i=vers->get_levels().size() - 1; i>= 0; i--) { + if (vers->get_levels()[i]) { + shards.emplace_back(vers->get_levels()[i]->get_merged_shard()); } } } @@ -263,16 +206,32 @@ public: return flattened; } + /* + * Mostly exposed for unit-testing purposes. Verifies that the current + * active version of the ExtensionStructure doesn't violate the maximum + * tombstone proportion invariant. + */ + bool validate_tombstone_proportion() { + return get_active_version()->validate_tombstone_proportion(); + } + private: - Buffer *m_buffer; + Scheduler m_sched; + + std::vector m_buffers; + std::vector m_versions; + + std::atomic m_current_epoch; size_t m_scale_factor; double m_max_delete_prop; - std::vector *> m_levels; - Buffer *get_buffer() { - return m_buffer; + return m_buffers[0]; + } + + Structure *get_active_version() { + return m_versions[0]; } int internal_append(const R &rec, bool ts) { @@ -281,13 +240,14 @@ private: ; if (buffer->is_full()) { - merge_buffer(); + auto vers = get_active_version(); + m_sched.schedule_merge(vers, buffer); } return buffer->append(rec, ts); } - std::vector> filter_deletes(std::vector> &records, ShardID shid, Buffer *buffer) { + std::vector> filter_deletes(std::vector> &records, ShardID shid, Buffer *buffer, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -322,12 +282,12 @@ private: if (shid != INVALID_SHID) { for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { - if (m_levels[lvl]->check_tombstone(0, rec.rec)) { + if (vers->get_levels()[lvl]->check_tombstone(0, rec.rec)) { continue; } } - if (m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) { + if (vers->get_levels()[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) { continue; } } @@ -337,198 +297,6 @@ private: return processed_records; } - - /* - * Add a new level to the LSM Tree and return that level's index. Will - * automatically determine whether the level should be on memory or on disk, - * and act appropriately. - */ - inline level_index grow() { - level_index new_idx; - - size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - new_idx = m_levels.size(); - if (new_idx > 0) { - assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); - } - m_levels.emplace_back(new InternalLevel(new_idx, new_shard_cnt)); - - return new_idx; - } - - - // Merge the memory table down into the tree, completing any required other - // merges to make room for it. - inline void merge_buffer() { - auto buffer = get_buffer(); - - if (!can_merge_with(0, buffer->get_record_count())) { - merge_down(0); - } - - merge_buffer_into_l0(buffer); - enforce_delete_maximum(0); - - buffer->truncate(); - return; - } - - /* - * Merge the specified level down into the tree. The level index must be - * non-negative (i.e., this function cannot be used to merge the buffer). This - * routine will recursively perform any necessary merges to make room for the - * specified level. - */ - inline void merge_down(level_index idx) { - level_index merge_base_level = find_mergable_level(idx); - if (merge_base_level == -1) { - merge_base_level = grow(); - } - - for (level_index i=merge_base_level; i>idx; i--) { - merge_levels(i, i-1); - enforce_delete_maximum(i); - } - - return; - } - - /* - * Find the first level below the level indicated by idx that - * is capable of sustaining a merge operation and return its - * level index. If no such level exists, returns -1. Also - * returns -1 if idx==0, and no such level exists, to simplify - * the logic of the first merge. - */ - inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { - - if (idx == 0 && m_levels.size() == 0) return -1; - - bool level_found = false; - bool disk_level; - level_index merge_level_idx; - - size_t incoming_rec_cnt = get_level_record_count(idx, buffer); - for (level_index i=idx+1; i::merge_levels(m_levels[base_level], m_levels[incoming_level]); - mark_as_unused(tmp); - } else { - m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); - } - - mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor); - } - - - inline void merge_buffer_into_l0(Buffer *buffer) { - assert(m_levels[0]); - if constexpr (L == LayoutPolicy::LEVELING) { - // FIXME: Kludgey implementation due to interface constraints. - auto old_level = m_levels[0]; - auto temp_level = new InternalLevel(0, 1); - temp_level->append_buffer(buffer); - auto new_level = InternalLevel::merge_levels(old_level, temp_level); - - m_levels[0] = new_level; - delete temp_level; - mark_as_unused(old_level); - } else { - m_levels[0]->append_buffer(buffer); - } - } - - /* - * Mark a given memory level as no-longer in use by the tree. For now this - * will just free the level. In future, this will be more complex as the - * level may not be able to immediately be deleted, depending upon who - * else is using it. - */ - inline void mark_as_unused(InternalLevel *level) { - delete level; - } - - /* - * Check the tombstone proportion for the specified level and - * if the limit is exceeded, forcibly merge levels until all - * levels below idx are below the limit. - */ - inline void enforce_delete_maximum(level_index idx) { - long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx); - - if (ts_prop > (long double) m_max_delete_prop) { - merge_down(idx); - } - - return; - } - - /* - * Assume that level "0" should be larger than the buffer. The buffer - * itself is index -1, which should return simply the buffer capacity. - */ - inline size_t calc_level_record_capacity(level_index idx) { - return get_buffer()->get_capacity() * pow(m_scale_factor, idx+1); - } - - /* - * Returns the actual number of records present on a specified level. An - * index value of -1 indicates the memory table. Can optionally pass in - * a pointer to the memory table to use, if desired. Otherwise, there are - * no guarantees about which buffer will be accessed if level_index is -1. - */ - inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) { - - assert(idx >= -1); - if (idx == -1) { - return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count(); - } - - return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; - } - - /* - * Determines if the specific level can merge with another record containing - * incoming_rec_cnt number of records. The provided level index should be - * non-negative (i.e., not refer to the buffer) and will be automatically - * translated into the appropriate index into either the disk or memory level - * vector. - */ - inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) { - if (idx>= m_levels.size() || !m_levels[idx]) { - return false; - } - - if (L == LayoutPolicy::LEVELING) { - return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); - } else { - return m_levels[idx]->get_shard_count() < m_scale_factor; - } - - // unreachable - assert(true); - } }; - } -- cgit v1.2.3