diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-09-13 16:22:03 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-09-13 16:22:03 -0400 |
| commit | eb8dbaa770a57557d67c817c2839c64f536a6ce4 (patch) | |
| tree | 77bbbb79fb70f79965e7f6fd75bb5f4799a6f120 | |
| parent | 076e104b8672924c3d80cd1da2fdb5ebee1766ac (diff) | |
| download | dynamic-extension-eb8dbaa770a57557d67c817c2839c64f536a6ce4.tar.gz | |
Began re-architecting the project for concurrency support
The project is now in a state where it builds, but it probably has a lot
of bugs still.
| m--------- | external/psudb-common | 0 | ||||
| -rw-r--r-- | include/framework/Configuration.h | 54 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 372 | ||||
| -rw-r--r-- | include/framework/ExtensionStructure.h | 374 | ||||
| -rw-r--r-- | include/framework/InternalLevel.h | 8 | ||||
| -rw-r--r-- | include/framework/MutableBuffer.h | 67 | ||||
| -rw-r--r-- | include/framework/RecordInterface.h | 2 | ||||
| -rw-r--r-- | include/framework/Scheduler.h | 76 | ||||
| -rw-r--r-- | tests/internal_level_tests.cpp | 4 |
9 files changed, 647 insertions, 310 deletions
diff --git a/external/psudb-common b/external/psudb-common -Subproject e5a10e888d248638e48bf82da70fa356eef47ba +Subproject b436420bf4c9a574b3a8e54c1ab46f46e82240a diff --git a/include/framework/Configuration.h b/include/framework/Configuration.h new file mode 100644 index 0000000..eb9b93f --- /dev/null +++ b/include/framework/Configuration.h @@ -0,0 +1,54 @@ +/* + * include/framework/DynamicExtension.h + * + * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include <atomic> +#include <numeric> +#include <cstdio> +#include <vector> + +#include "psu-util/timer.h" +#include "psu-ds/Alias.h" + +namespace de { + +thread_local size_t sampling_attempts = 0; +thread_local size_t sampling_rejections = 0; +thread_local size_t deletion_rejections = 0; +thread_local size_t bounds_rejections = 0; +thread_local size_t tombstone_rejections = 0; +thread_local size_t buffer_rejections = 0; + +/* + * thread_local size_t various_sampling_times go here. + */ +thread_local size_t sample_range_time = 0; +thread_local size_t alias_time = 0; +thread_local size_t alias_query_time = 0; +thread_local size_t rejection_check_time = 0; +thread_local size_t buffer_sample_time = 0; +thread_local size_t memlevel_sample_time = 0; +thread_local size_t disklevel_sample_time = 0; +thread_local size_t sampling_bailouts = 0; + + +enum class LayoutPolicy { + LEVELING, + TEIRING +}; + +enum class DeletePolicy { + TOMBSTONE, + TAGGING +}; + +typedef ssize_t level_index; + +} diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 524024b..5e9bcee 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -19,62 +19,36 @@ #include "framework/ShardInterface.h" #include "framework/QueryInterface.h" #include "framework/RecordInterface.h" +#include "framework/ExtensionStructure.h" + +#include "framework/Configuration.h" +#include "framework/Scheduler.h" -#include "shard/WIRS.h" #include "psu-util/timer.h" #include "psu-ds/Alias.h" namespace de { -thread_local size_t sampling_attempts = 0; -thread_local size_t sampling_rejections = 0; -thread_local size_t deletion_rejections = 0; -thread_local size_t bounds_rejections = 0; -thread_local size_t tombstone_rejections = 0; -thread_local size_t buffer_rejections = 0; - -/* - * thread_local size_t various_sampling_times go here. - */ -thread_local size_t sample_range_time = 0; -thread_local size_t alias_time = 0; -thread_local size_t alias_query_time = 0; -thread_local size_t rejection_check_time = 0; -thread_local size_t buffer_sample_time = 0; -thread_local size_t memlevel_sample_time = 0; -thread_local size_t disklevel_sample_time = 0; -thread_local size_t sampling_bailouts = 0; - - -enum class LayoutPolicy { - LEVELING, - TEIRING -}; - -enum class DeletePolicy { - TOMBSTONE, - TAGGING -}; - -typedef ssize_t level_index; - template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING> class DynamicExtension { - //typedef typename S<R> Shard; typedef S Shard; typedef MutableBuffer<R> Buffer; - + typedef ExtensionStructure<R, S, Q, L> Structure; public: - DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) - : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop), - m_buffer(new Buffer(buffer_cap, buffer_cap * max_delete_prop)) + DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop, size_t memory_budget=0, + size_t thread_cnt=16) + : m_scale_factor(scale_factor) + , m_max_delete_prop(max_delete_prop) + , m_sched(memory_budget, thread_cnt) { } ~DynamicExtension() { - delete m_buffer; + for (size_t i=0; i<m_buffers.size(); i++) { + delete m_buffers[i]; + } - for (size_t i=0; i<m_levels.size(); i++) { - delete m_levels[i]; + for (size_t i=0; i<m_versions.size(); i++) { + delete m_versions[i]; } } @@ -83,30 +57,30 @@ public: } int erase(const R &rec) { - Buffer *buffer; + Buffer *buffer = get_buffer(); if constexpr (D == DeletePolicy::TAGGING) { - auto buffer = get_buffer(); - - // Check the levels first. This assumes there aren't - // any undeleted duplicate records. - for (auto level : m_levels) { - if (level && level->delete_record(rec)) { - return 1; - } + if (get_active_version()->tagged_delete(rec)) { + return 1; } - // the buffer will take the longest amount of time, and - // probably has the lowest probability of having the record, - // so we'll check it last. + /* + * the buffer will take the longest amount of time, and + * probably has the lowest probability of having the record, + * so we'll check it last. + */ return buffer->delete_record(rec); } + /* + * If tagging isn't used, then delete using a tombstone + */ return internal_append(rec, true); } std::vector<R> query(void *parms) { auto buffer = get_buffer(); + auto vers = get_active_version(); // Get the buffer query state auto buffer_state = Q::get_buffer_query_state(buffer, parms); @@ -115,7 +89,7 @@ public: std::vector<std::pair<ShardID, Shard*>> shards; std::vector<void*> states; - for (auto &level : m_levels) { + for (auto &level : vers->get_levels()) { level->get_query_states(shards, states, parms); } @@ -125,7 +99,7 @@ public: // Execute the query for the buffer auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); - query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer)); + query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers)); if constexpr (Q::EARLY_ABORT) { if (query_results[0].size() > 0) { auto result = Q::merge(query_results, parms); @@ -141,7 +115,7 @@ public: // Execute the query for each shard for (size_t i=0; i<shards.size(); i++) { auto shard_results = Q::query(shards[i].second, states[i], parms); - query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer)); + query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer, vers)); if constexpr (Q::EARLY_ABORT) { if (query_results[i].size() > 0) { auto result = Q::merge(query_results, parms); @@ -170,75 +144,44 @@ public: size_t get_record_count() { size_t cnt = get_buffer()->get_record_count(); - - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) cnt += m_levels[i]->get_record_count(); - } - - return cnt; + return cnt + get_active_version()->get_record_count(); } size_t get_tombstone_cnt() { size_t cnt = get_buffer()->get_tombstone_count(); - - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count(); - } - - return cnt; + return cnt + get_active_version()->get_tombstone_cnt(); } size_t get_height() { - return m_levels.size(); + return get_active_version()->get_height(); } size_t get_memory_usage() { - size_t cnt = m_buffer->get_memory_usage(); - - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) cnt += m_levels[i]->get_memory_usage(); - } + auto vers = get_active_version(); + auto buffer = get_buffer(); - return cnt; + return vers.get_memory_usage() + buffer->get_memory_usage(); } size_t get_aux_memory_usage() { - size_t cnt = m_buffer->get_aux_memory_usage(); - - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) { - cnt += m_levels[i]->get_aux_memory_usage(); - } - } - - return cnt; - } - - bool validate_tombstone_proportion() { - long double ts_prop; - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) { - ts_prop = (long double) m_levels[i]->get_tombstone_count() / (long double) calc_level_record_capacity(i); - if (ts_prop > (long double) m_max_delete_prop) { - return false; - } - } - } + auto vers = get_active_version(); + auto buffer = get_buffer(); - return true; + return vers.get_aux_memory_usage() + buffer->get_aux_memory_usage(); } size_t get_buffer_capacity() { - return m_buffer->get_capacity(); + return get_height()->get_capacity(); } Shard *create_static_structure() { + auto vers = get_active_version(); std::vector<Shard *> shards; - if (m_levels.size() > 0) { - for (int i=m_levels.size() - 1; i>= 0; i--) { - if (m_levels[i]) { - shards.emplace_back(m_levels[i]->get_merged_shard()); + if (vers->get_levels().size() > 0) { + for (int i=vers->get_levels().size() - 1; i>= 0; i--) { + if (vers->get_levels()[i]) { + shards.emplace_back(vers->get_levels()[i]->get_merged_shard()); } } } @@ -263,16 +206,32 @@ public: return flattened; } + /* + * Mostly exposed for unit-testing purposes. Verifies that the current + * active version of the ExtensionStructure doesn't violate the maximum + * tombstone proportion invariant. + */ + bool validate_tombstone_proportion() { + return get_active_version()->validate_tombstone_proportion(); + } + private: - Buffer *m_buffer; + Scheduler<R, S, Q, L> m_sched; + + std::vector<Buffer *> m_buffers; + std::vector<Structure *> m_versions; + + std::atomic<size_t> m_current_epoch; size_t m_scale_factor; double m_max_delete_prop; - std::vector<InternalLevel<R, S, Q> *> m_levels; - Buffer *get_buffer() { - return m_buffer; + return m_buffers[0]; + } + + Structure *get_active_version() { + return m_versions[0]; } int internal_append(const R &rec, bool ts) { @@ -281,13 +240,14 @@ private: ; if (buffer->is_full()) { - merge_buffer(); + auto vers = get_active_version(); + m_sched.schedule_merge(vers, buffer); } return buffer->append(rec, ts); } - std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer) { + std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -322,12 +282,12 @@ private: if (shid != INVALID_SHID) { for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { - if (m_levels[lvl]->check_tombstone(0, rec.rec)) { + if (vers->get_levels()[lvl]->check_tombstone(0, rec.rec)) { continue; } } - if (m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) { + if (vers->get_levels()[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) { continue; } } @@ -337,198 +297,6 @@ private: return processed_records; } - - /* - * Add a new level to the LSM Tree and return that level's index. Will - * automatically determine whether the level should be on memory or on disk, - * and act appropriately. - */ - inline level_index grow() { - level_index new_idx; - - size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - new_idx = m_levels.size(); - if (new_idx > 0) { - assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); - } - m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)); - - return new_idx; - } - - - // Merge the memory table down into the tree, completing any required other - // merges to make room for it. - inline void merge_buffer() { - auto buffer = get_buffer(); - - if (!can_merge_with(0, buffer->get_record_count())) { - merge_down(0); - } - - merge_buffer_into_l0(buffer); - enforce_delete_maximum(0); - - buffer->truncate(); - return; - } - - /* - * Merge the specified level down into the tree. The level index must be - * non-negative (i.e., this function cannot be used to merge the buffer). This - * routine will recursively perform any necessary merges to make room for the - * specified level. - */ - inline void merge_down(level_index idx) { - level_index merge_base_level = find_mergable_level(idx); - if (merge_base_level == -1) { - merge_base_level = grow(); - } - - for (level_index i=merge_base_level; i>idx; i--) { - merge_levels(i, i-1); - enforce_delete_maximum(i); - } - - return; - } - - /* - * Find the first level below the level indicated by idx that - * is capable of sustaining a merge operation and return its - * level index. If no such level exists, returns -1. Also - * returns -1 if idx==0, and no such level exists, to simplify - * the logic of the first merge. - */ - inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { - - if (idx == 0 && m_levels.size() == 0) return -1; - - bool level_found = false; - bool disk_level; - level_index merge_level_idx; - - size_t incoming_rec_cnt = get_level_record_count(idx, buffer); - for (level_index i=idx+1; i<m_levels.size(); i++) { - if (can_merge_with(i, incoming_rec_cnt)) { - return i; - } - - incoming_rec_cnt = get_level_record_count(i); - } - - return -1; - } - - /* - * Merge the level specified by incoming level into the level specified - * by base level. The two levels should be sequential--i.e. no levels - * are skipped in the merge process--otherwise the tombstone ordering - * invariant may be violated by the merge operation. - */ - inline void merge_levels(level_index base_level, level_index incoming_level) { - // merging two memory levels - if constexpr (L == LayoutPolicy::LEVELING) { - auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); - mark_as_unused(tmp); - } else { - m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); - } - - mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor); - } - - - inline void merge_buffer_into_l0(Buffer *buffer) { - assert(m_levels[0]); - if constexpr (L == LayoutPolicy::LEVELING) { - // FIXME: Kludgey implementation due to interface constraints. - auto old_level = m_levels[0]; - auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); - temp_level->append_buffer(buffer); - auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level); - - m_levels[0] = new_level; - delete temp_level; - mark_as_unused(old_level); - } else { - m_levels[0]->append_buffer(buffer); - } - } - - /* - * Mark a given memory level as no-longer in use by the tree. For now this - * will just free the level. In future, this will be more complex as the - * level may not be able to immediately be deleted, depending upon who - * else is using it. - */ - inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) { - delete level; - } - - /* - * Check the tombstone proportion for the specified level and - * if the limit is exceeded, forcibly merge levels until all - * levels below idx are below the limit. - */ - inline void enforce_delete_maximum(level_index idx) { - long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx); - - if (ts_prop > (long double) m_max_delete_prop) { - merge_down(idx); - } - - return; - } - - /* - * Assume that level "0" should be larger than the buffer. The buffer - * itself is index -1, which should return simply the buffer capacity. - */ - inline size_t calc_level_record_capacity(level_index idx) { - return get_buffer()->get_capacity() * pow(m_scale_factor, idx+1); - } - - /* - * Returns the actual number of records present on a specified level. An - * index value of -1 indicates the memory table. Can optionally pass in - * a pointer to the memory table to use, if desired. Otherwise, there are - * no guarantees about which buffer will be accessed if level_index is -1. - */ - inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) { - - assert(idx >= -1); - if (idx == -1) { - return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count(); - } - - return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; - } - - /* - * Determines if the specific level can merge with another record containing - * incoming_rec_cnt number of records. The provided level index should be - * non-negative (i.e., not refer to the buffer) and will be automatically - * translated into the appropriate index into either the disk or memory level - * vector. - */ - inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) { - if (idx>= m_levels.size() || !m_levels[idx]) { - return false; - } - - if (L == LayoutPolicy::LEVELING) { - return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); - } else { - return m_levels[idx]->get_shard_count() < m_scale_factor; - } - - // unreachable - assert(true); - } }; - } diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h new file mode 100644 index 0000000..1e756db --- /dev/null +++ b/include/framework/ExtensionStructure.h @@ -0,0 +1,374 @@ +/* + * include/framework/ExtensionStructure.h + * + * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include <atomic> +#include <numeric> +#include <cstdio> +#include <vector> + +#include "framework/MutableBuffer.h" +#include "framework/InternalLevel.h" +#include "framework/ShardInterface.h" +#include "framework/QueryInterface.h" +#include "framework/RecordInterface.h" + +#include "framework/Configuration.h" + +#include "psu-util/timer.h" +#include "psu-ds/Alias.h" + +namespace de { + +template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING> +class ExtensionStructure { + typedef S Shard; + typedef MutableBuffer<R> Buffer; + +public: + ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) + : m_scale_factor(scale_factor) + , m_max_delete_prop(max_delete_prop) + , m_buffer_size(buffer_size) + {} + + ~ExtensionStructure() = default; + + /* + * Create a shallow copy of this extension structure. The copy will share references to the + * same levels/shards as the original, but will have its own lists. As all of the shards are + * immutable (with the exception of deletes), the copy can be restructured with merges, etc., + * without affecting the original. + * + * NOTE: When using tagged deletes, a delete of a record in the original structure will affect + * the copy, so long as the copy retains a reference to the same shard as the original. This could + * cause synchronization problems under tagging with concurrency. Any deletes in this context will + * need to be forwarded to the appropriate structures manually. + */ + ExtensionStructure<R, S, Q, L> *copy() { + auto new_struct = new ExtensionStructure<R, S, Q, L>(m_scale_factor, m_max_delete_prop, m_buffer_size); + for (size_t i=0; i<m_levels.size(); i++) { + new_struct->m_levels.push_back(m_levels[i]); + } + + return new_struct; + } + + /* + * Search for a record matching the argument and mark it deleted by + * setting the delete bit in its wrapped header. Returns 1 if a matching + * record was found and deleted, and 0 if a matching record was not found. + * + * This function will stop after finding the first matching record. It is assumed + * that no duplicate records exist. In the case of duplicates, this function will + * still "work", but in the sense of "delete first match". + */ + int tagged_delete(const R &rec) { + for (auto level : m_levels) { + if (level && level->delete_record(rec)) { + return 1; + } + } + + /* + * If the record to be erased wasn't found, return 0. The + * DynamicExtension itself will then search the active + * Buffers. + */ + return 0; + } + + /* + * Merge the memory table down into the tree, completing any required other + * merges to make room for it. + */ + inline bool merge_buffer(Buffer *buffer) { + if (!can_merge_with(0, buffer->get_record_count())) { + merge_down(0); + } + + merge_buffer_into_l0(buffer); + enforce_delete_maximum(0); + + buffer->truncate(); + return true; + } + + /* + * Return the total number of records (including tombstones) within all + * of the levels of the structure. + */ + size_t get_record_count() { + size_t cnt = 0; + + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) cnt += m_levels[i]->get_record_count(); + } + + return cnt; + } + + /* + * Return the total number of tombstones contained within all of the + * levels of the structure. + */ + size_t get_tombstone_cnt() { + size_t cnt = 0; + + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count(); + } + + return cnt; + } + + /* + * Return the number of levels within the structure. Note that not + * all of these levels are necessarily populated. + */ + size_t get_height() { + return m_levels.size(); + } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing the primary data structure and raw data. + */ + size_t get_memory_usage() { + size_t cnt = 0; + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) cnt += m_levels[i]->get_memory_usage(); + } + + return cnt; + } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing auxiliary data structures. This total does not + * include memory used for the main data structure, or raw data. + */ + size_t get_aux_memory_usage() { + size_t cnt = 0; + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) { + cnt += m_levels[i]->get_aux_memory_usage(); + } + } + + return cnt; + } + + /* + * Validate that no level in the structure exceeds its maximum tombstone capacity. This is + * used to trigger preemptive compactions at the end of the merge process. + */ + bool validate_tombstone_proportion() { + long double ts_prop; + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) { + ts_prop = (long double) m_levels[i]->get_tombstone_count() / (long double) calc_level_record_capacity(i); + if (ts_prop > (long double) m_max_delete_prop) { + return false; + } + } + } + + return true; + } + + /* + * Return a reference to the underlying vector of levels within the + * structure. + */ + std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> &get_levels() { + return m_levels; + } + +private: + size_t m_scale_factor; + double m_max_delete_prop; + size_t m_buffer_size; + + std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels; + + /* + * Add a new level to the LSM Tree and return that level's index. Will + * automatically determine whether the level should be on memory or on disk, + * and act appropriately. + */ + inline level_index grow() { + level_index new_idx; + + size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + new_idx = m_levels.size(); + if (new_idx > 0) { + assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); + } + m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt))); + + return new_idx; + } + + + /* + * Merge the specified level down into the tree. The level index must be + * non-negative (i.e., this function cannot be used to merge the buffer). This + * routine will recursively perform any necessary merges to make room for the + * specified level. + */ + inline void merge_down(level_index idx) { + level_index merge_base_level = find_mergable_level(idx); + if (merge_base_level == -1) { + merge_base_level = grow(); + } + + for (level_index i=merge_base_level; i>idx; i--) { + merge_levels(i, i-1); + enforce_delete_maximum(i); + } + + return; + } + + /* + * Find the first level below the level indicated by idx that + * is capable of sustaining a merge operation and return its + * level index. If no such level exists, returns -1. Also + * returns -1 if idx==0, and no such level exists, to simplify + * the logic of the first merge. + */ + inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { + + if (idx == 0 && m_levels.size() == 0) return -1; + + bool level_found = false; + bool disk_level; + level_index merge_level_idx; + + size_t incoming_rec_cnt = get_level_record_count(idx, buffer); + for (level_index i=idx+1; i<m_levels.size(); i++) { + if (can_merge_with(i, incoming_rec_cnt)) { + return i; + } + + incoming_rec_cnt = get_level_record_count(i); + } + + return -1; + } + + /* + * Merge the level specified by incoming level into the level specified + * by base level. The two levels should be sequential--i.e. no levels + * are skipped in the merge process--otherwise the tombstone ordering + * invariant may be violated by the merge operation. + */ + inline void merge_levels(level_index base_level, level_index incoming_level) { + // merging two memory levels + if constexpr (L == LayoutPolicy::LEVELING) { + auto tmp = m_levels[base_level]; + m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get()); + } else { + m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get()); + } + + m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + } + + + inline void merge_buffer_into_l0(Buffer *buffer) { + assert(m_levels[0]); + if constexpr (L == LayoutPolicy::LEVELING) { + // FIXME: Kludgey implementation due to interface constraints. + auto old_level = m_levels[0].get(); + auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); + temp_level->append_buffer(buffer); + auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level); + + m_levels[0] = new_level; + delete temp_level; + } else { + m_levels[0]->append_buffer(buffer); + } + } + + /* + * Mark a given memory level as no-longer in use by the tree. For now this + * will just free the level. In future, this will be more complex as the + * level may not be able to immediately be deleted, depending upon who + * else is using it. + */ + inline void mark_as_unused(std::shared_ptr<InternalLevel<R, Shard, Q>> level) { + level.reset(); + } + + /* + * Check the tombstone proportion for the specified level and + * if the limit is exceeded, forcibly merge levels until all + * levels below idx are below the limit. + */ + inline void enforce_delete_maximum(level_index idx) { + long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx); + + if (ts_prop > (long double) m_max_delete_prop) { + merge_down(idx); + } + + return; + } + + /* + * Assume that level "0" should be larger than the buffer. The buffer + * itself is index -1, which should return simply the buffer capacity. + */ + inline size_t calc_level_record_capacity(level_index idx) { + return m_buffer_size * pow(m_scale_factor, idx+1); + } + + /* + * Returns the actual number of records present on a specified level. An + * index value of -1 indicates the memory table. Can optionally pass in + * a pointer to the memory table to use, if desired. Otherwise, there are + * no guarantees about which buffer will be accessed if level_index is -1. + */ + inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) { + if (buffer) { + return buffer->get_record_count(); + } + + return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; + } + + /* + * Determines if the specific level can merge with another record containing + * incoming_rec_cnt number of records. The provided level index should be + * non-negative (i.e., not refer to the buffer) and will be automatically + * translated into the appropriate index into either the disk or memory level + * vector. + */ + inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) { + if (idx>= m_levels.size() || !m_levels[idx]) { + return false; + } + + if (L == LayoutPolicy::LEVELING) { + return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); + } else { + return m_levels[idx]->get_shard_count() < m_scale_factor; + } + + // unreachable + assert(true); + } +}; + +} + diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index ec8ffc4..983ec6a 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -19,6 +19,10 @@ #include "framework/MutableBuffer.h" namespace de { +template <RecordInterface R, ShardInterface S, QueryInterface Q> +class InternalLevel; + + template <RecordInterface R, ShardInterface S, QueryInterface Q> class InternalLevel { @@ -55,7 +59,7 @@ public: // WARNING: for leveling only. // assuming the base level is the level new level is merging into. (base_level is larger.) - static InternalLevel* merge_levels(InternalLevel* base_level, InternalLevel* new_level) { + static std::shared_ptr<InternalLevel> merge_levels(InternalLevel* base_level, InternalLevel* new_level) { assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); auto res = new InternalLevel(base_level->m_level_no, 1); res->m_shard_cnt = 1; @@ -64,7 +68,7 @@ public: shards[1] = new_level->m_shards[0]; res->m_shards[0] = new S(shards, 2); - return res; + return std::shared_ptr<InternalLevel>(res); } void append_buffer(Buffer* buffer) { diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index b79fc02..cadecb6 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -11,6 +11,7 @@ #include <cstdlib> #include <atomic> +#include <condition_variable> #include <cassert> #include <numeric> #include <algorithm> @@ -33,16 +34,22 @@ public: MutableBuffer(size_t capacity, size_t max_tombstone_cap) : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { - auto len = capacity * sizeof(Wrapped<R>); - size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE)); - m_data = (Wrapped<R>*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); + m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>)); + m_merge_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>)); m_tombstone_filter = nullptr; if (max_tombstone_cap > 0) { m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS); } + + m_refcnt.store(0); + m_deferred_truncate.store(false); + m_merging.store(false); } ~MutableBuffer() { + assert(m_refcnt.load() == 0); + assert(m_merging.load() == false); + if (m_data) free(m_data); if (m_tombstone_filter) delete m_tombstone_filter; } @@ -157,6 +164,50 @@ public: return m_max_weight; } + bool start_merge() { + if (m_merge_lock.try_lock()) { + /* there cannot already been an active merge */ + if (m_merging.load()) { + m_merge_lock.unlock(); + return false; + } + + m_merging.store(true); + memcpy(m_merge_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load()); + return true; + } + + /* lock could not be obtained */ + return false; + } + + bool finish_merge() { + m_merge_lock.unlock(); + return true; + } + + /* + * Concurrency-related operations + */ + bool take_reference() { + m_refcnt.fetch_add(1); + return true; + } + + bool release_reference() { + m_refcnt.fetch_add(-1); + + if (m_refcnt.load() == 0 && m_deferred_truncate.load()) { + assert(this->truncate()); + } + + return true; + } + + bool active_merge() { + return m_merging.load(); + } + private: int32_t try_advance_tail() { size_t new_tail = m_reccnt.fetch_add(1); @@ -169,12 +220,22 @@ private: size_t m_tombstone_cap; Wrapped<R>* m_data; + Wrapped<R>* m_merge_data; + psudb::BloomFilter<R>* m_tombstone_filter; alignas(64) std::atomic<size_t> m_tombstonecnt; alignas(64) std::atomic<uint32_t> m_reccnt; alignas(64) std::atomic<double> m_weight; alignas(64) std::atomic<double> m_max_weight; + alignas(64) std::atomic<bool> m_merging; + alignas(64) std::atomic<bool> m_deferred_truncate; + alignas(64) std::atomic<size_t> m_refcnt; + + alignas(64) std::mutex m_merge_lock; + alignas(64) std::mutex m_trunc_lock; + alignas(64) std::condition_variable m_trunc_signal; + }; } diff --git a/include/framework/RecordInterface.h b/include/framework/RecordInterface.h index f78918c..1ef1984 100644 --- a/include/framework/RecordInterface.h +++ b/include/framework/RecordInterface.h @@ -207,7 +207,7 @@ struct EuclidPoint{ template<RecordInterface R> struct RecordHash { size_t operator()(R const &rec) const { - return psudb::hash_bytes((char *) &rec, sizeof(R)); + return psudb::hash_bytes((std::byte *) &rec, sizeof(R)); } }; diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h new file mode 100644 index 0000000..cd3f430 --- /dev/null +++ b/include/framework/Scheduler.h @@ -0,0 +1,76 @@ +/* + * include/framework/Scheduler.h + * + * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include <vector> +#include <memory> +#include <queue> + +#include "util/types.h" +#include "framework/ShardInterface.h" +#include "framework/QueryInterface.h" +#include "framework/RecordInterface.h" +#include "framework/MutableBuffer.h" +#include "framework/Configuration.h" +#include "framework/ExtensionStructure.h" + +namespace de { + + +struct MergeTask { + level_index m_source_level; + level_index m_target_level; + size_t m_size; + size_t m_timestamp; + + bool operator<(MergeTask &other) { + return m_timestamp < other.m_timestamp; + } +}; + + +template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L> +class Scheduler { + typedef ExtensionStructure<R, S, Q, L> Structure; +public: + /* + * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means + * unlimited. + */ + Scheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX) + , m_used_memory(0) + , m_used_threads(0) + {} + + bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) { + // FIXME: this is a non-concurrent implementation + return version->merge_buffer(buffer); + } + +private: + size_t get_timestamp() { + auto ts = m_timestamp.fetch_add(1); + return ts; + } + + size_t m_memory_budget; + size_t m_thread_cnt; + + alignas(64) std::atomic<size_t> m_used_memory; + alignas(64) std::atomic<size_t> m_used_threads; + alignas(64) std::atomic<size_t> m_timestamp; + + std::priority_queue<MergeTask> m_merge_queue; + std::mutex m_merge_queue_lock; +}; + +} diff --git a/tests/internal_level_tests.cpp b/tests/internal_level_tests.cpp index 9deb485..b0dfacb 100644 --- a/tests/internal_level_tests.cpp +++ b/tests/internal_level_tests.cpp @@ -37,11 +37,11 @@ START_TEST(t_memlevel_merge) ck_assert_int_eq(merging_level->get_record_count(), 100); auto old_level = base_level; - base_level = ILevel::merge_levels(old_level, merging_level); + auto new_level = ILevel::merge_levels(old_level, merging_level); delete old_level; delete merging_level; - ck_assert_int_eq(base_level->get_record_count(), 200); + ck_assert_int_eq(new_level->get_record_count(), 200); delete base_level; delete tbl1; |