From 7c03d771475421c1d5a2bbc135242536af1a371c Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 25 Sep 2023 10:49:36 -0400 Subject: Re-structuring Project + scheduling updates This is a big one--probably should have split it apart, but I'm feeling lazy this morning. * Organized the mess of header files in include/framework by splitting them out into their own subdirectories, and renaming a few files to remove redundancies introduced by the directory structure. * Introduced a new framework/ShardRequirements.h header file for simpler shard development. This header simply contains the necessary includes from framework/* for creating shard files. This should help to remove structural dependencies from the framework file structure and shards, as well as centralizing the necessary framework files to make shard development easier. * Created a (currently dummy) SchedulerInterface, and make the scheduler implementation a template parameter of the dynamic extension for easier testing of various scheduling policies. There's still more work to be done to fully integrate the scheduler (queries, multiple buffers), but some more of the necessary framework code for this has been added as well. * Adjusted the Task interface setup for the scheduler. The task structures have been removed from ExtensionStructure and placed in their own header file. Additionally, I started experimenting with using std::variant, as opposed to inheritence, to implement subtype polymorphism on the Merge and Query tasks. The scheduler now has a general task queue that contains both, and std::variant, std::visit, and std::get are used to manipulate them without virtual functions. * Removed Alex.h, as it can't build anyway. There's a branch out there containing the Alex implementation stripped of the C++20 stuff. So there's no need to keep it here. --- include/framework/structure/ExtensionStructure.h | 428 +++++++++++++++++++++++ include/framework/structure/InternalLevel.h | 258 ++++++++++++++ include/framework/structure/MutableBuffer.h | 242 +++++++++++++ 3 files changed, 928 insertions(+) create mode 100644 include/framework/structure/ExtensionStructure.h create mode 100644 include/framework/structure/InternalLevel.h create mode 100644 include/framework/structure/MutableBuffer.h (limited to 'include/framework/structure') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h new file mode 100644 index 0000000..920e1c3 --- /dev/null +++ b/include/framework/structure/ExtensionStructure.h @@ -0,0 +1,428 @@ +/* + * include/framework/ExtensionStructure.h + * + * Copyright (C) 2023 Douglas Rumbaugh + * Dong Xie + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include +#include +#include +#include + +#include "framework/structure/MutableBuffer.h" +#include "framework/structure/InternalLevel.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" +#include "framework/interface/Record.h" + +#include "framework/util/Configuration.h" +#include "framework/scheduling/Task.h" + +#include "psu-util/timer.h" +#include "psu-ds/Alias.h" + +namespace de { + +template +class ExtensionStructure { + typedef S Shard; + typedef MutableBuffer Buffer; + +public: + ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) + : m_scale_factor(scale_factor) + , m_max_delete_prop(max_delete_prop) + , m_buffer_size(buffer_size) + {} + + ~ExtensionStructure() = default; + + /* + * Create a shallow copy of this extension structure. The copy will share references to the + * same levels/shards as the original, but will have its own lists. As all of the shards are + * immutable (with the exception of deletes), the copy can be restructured with merges, etc., + * without affecting the original. + * + * NOTE: When using tagged deletes, a delete of a record in the original structure will affect + * the copy, so long as the copy retains a reference to the same shard as the original. This could + * cause synchronization problems under tagging with concurrency. Any deletes in this context will + * need to be forwarded to the appropriate structures manually. + */ + ExtensionStructure *copy() { + auto new_struct = new ExtensionStructure(m_scale_factor, m_max_delete_prop, m_buffer_size); + for (size_t i=0; im_levels.push_back(m_levels[i]->clone()); + } + + return new_struct; + } + + /* + * Search for a record matching the argument and mark it deleted by + * setting the delete bit in its wrapped header. Returns 1 if a matching + * record was found and deleted, and 0 if a matching record was not found. + * + * This function will stop after finding the first matching record. It is assumed + * that no duplicate records exist. In the case of duplicates, this function will + * still "work", but in the sense of "delete first match". + */ + int tagged_delete(const R &rec) { + for (auto level : m_levels) { + if (level && level->delete_record(rec)) { + return 1; + } + } + + /* + * If the record to be erased wasn't found, return 0. The + * DynamicExtension itself will then search the active + * Buffers. + */ + return 0; + } + + /* + * Merge the memory table down into the tree, completing any required other + * merges to make room for it. + */ + inline bool merge_buffer(Buffer *buffer) { + assert(can_merge_with(0, buffer->get_record_count())); + + merge_buffer_into_l0(buffer); + buffer->truncate(); + + return true; + } + + /* + * Return the total number of records (including tombstones) within all + * of the levels of the structure. + */ + size_t get_record_count() { + size_t cnt = 0; + + for (size_t i=0; iget_record_count(); + } + + return cnt; + } + + /* + * Return the total number of tombstones contained within all of the + * levels of the structure. + */ + size_t get_tombstone_cnt() { + size_t cnt = 0; + + for (size_t i=0; iget_tombstone_count(); + } + + return cnt; + } + + /* + * Return the number of levels within the structure. Note that not + * all of these levels are necessarily populated. + */ + size_t get_height() { + return m_levels.size(); + } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing the primary data structure and raw data. + */ + size_t get_memory_usage() { + size_t cnt = 0; + for (size_t i=0; iget_memory_usage(); + } + + return cnt; + } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing auxiliary data structures. This total does not + * include memory used for the main data structure, or raw data. + */ + size_t get_aux_memory_usage() { + size_t cnt = 0; + for (size_t i=0; iget_aux_memory_usage(); + } + } + + return cnt; + } + + /* + * Validate that no level in the structure exceeds its maximum tombstone capacity. This is + * used to trigger preemptive compactions at the end of the merge process. + */ + bool validate_tombstone_proportion() { + long double ts_prop; + for (size_t i=0; iget_tombstone_count() / (long double) calc_level_record_capacity(i); + if (ts_prop > (long double) m_max_delete_prop) { + return false; + } + } + } + + return true; + } + + bool validate_tombstone_proportion(level_index level) { + long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level); + return ts_prop <= (long double) m_max_delete_prop; + } + + /* + * Return a reference to the underlying vector of levels within the + * structure. + */ + std::vector>> &get_levels() { + return m_levels; + } + + /* + * + */ + std::vector get_merge_tasks(size_t buffer_reccnt) { + std::vector merges; + + /* + * The buffer -> L0 merge task is not included so if that + * can be done without any other change, just return an + * empty list. + */ + if (can_merge_with(0, buffer_reccnt)) { + return std::move(merges); + } + + level_index merge_base_level = find_mergable_level(0); + if (merge_base_level == -1) { + merge_base_level = grow(); + } + + for (level_index i=merge_base_level; i>0; i--) { + MergeTask task; + task.m_source_level = i - 1; + task.m_target_level = i; + task.m_type = TaskType::MERGE; + + /* + * The amount of storage required for the merge accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i-1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_merge_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } + task.m_size = 2* reccnt * sizeof(R); + + merges.push_back(task); + } + + return std::move(merges); + } + + + /* + * + */ + std::vector get_merge_tasks_from_level(size_t source_level) { + std::vector merges; + + level_index merge_base_level = find_mergable_level(source_level); + if (merge_base_level == -1) { + merge_base_level = grow(); + } + + for (level_index i=merge_base_level; i>source_level; i--) { + MergeTask task; + task.m_source_level = i - 1; + task.m_target_level = i; + + /* + * The amount of storage required for the merge accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i-1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_merge_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } + task.m_size = 2* reccnt * sizeof(R); + + merges.push_back(task); + } + + return std::move(merges); + } + + /* + * Merge the level specified by incoming level into the level specified + * by base level. The two levels should be sequential--i.e. no levels + * are skipped in the merge process--otherwise the tombstone ordering + * invariant may be violated by the merge operation. + */ + inline void merge_levels(level_index base_level, level_index incoming_level) { + // merging two memory levels + if constexpr (L == LayoutPolicy::LEVELING) { + auto tmp = m_levels[base_level]; + m_levels[base_level] = InternalLevel::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get()); + } else { + m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get()); + m_levels[base_level]->finalize(); + } + + m_levels[incoming_level] = std::shared_ptr>(new InternalLevel(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + } + + +private: + size_t m_scale_factor; + double m_max_delete_prop; + size_t m_buffer_size; + + std::vector>> m_levels; + + /* + * Add a new level to the LSM Tree and return that level's index. Will + * automatically determine whether the level should be on memory or on disk, + * and act appropriately. + */ + inline level_index grow() { + level_index new_idx = m_levels.size(); + size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + m_levels.emplace_back(std::shared_ptr>(new InternalLevel(new_idx, new_shard_cnt))); + return new_idx; + } + + /* + * Find the first level below the level indicated by idx that + * is capable of sustaining a merge operation and return its + * level index. If no such level exists, returns -1. Also + * returns -1 if idx==0, and no such level exists, to skimplify + * the logic of the first merge. + */ + inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { + + if (idx == 0 && m_levels.size() == 0) return -1; + + bool level_found = false; + bool disk_level; + level_index merge_level_idx; + + size_t incoming_rec_cnt = get_level_record_count(idx, buffer); + for (level_index i=idx+1; i(0, 1); + temp_level->append_buffer(buffer); + auto new_level = InternalLevel::merge_levels(old_level, temp_level); + + m_levels[0] = new_level; + delete temp_level; + } else { + m_levels[0]->append_buffer(buffer); + } + } + + /* + * Mark a given memory level as no-longer in use by the tree. For now this + * will just free the level. In future, this will be more complex as the + * level may not be able to immediately be deleted, depending upon who + * else is using it. + */ + inline void mark_as_unused(std::shared_ptr> level) { + level.reset(); + } + + /* + * Assume that level "0" should be larger than the buffer. The buffer + * itself is index -1, which should return simply the buffer capacity. + */ + inline size_t calc_level_record_capacity(level_index idx) { + return m_buffer_size * pow(m_scale_factor, idx+1); + } + + /* + * Returns the actual number of records present on a specified level. An + * index value of -1 indicates the memory table. Can optionally pass in + * a pointer to the memory table to use, if desired. Otherwise, there are + * no guarantees about which buffer will be accessed if level_index is -1. + */ + inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) { + if (buffer) { + return buffer->get_record_count(); + } + + return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; + } + + /* + * Determines if the specific level can merge with another record containing + * incoming_rec_cnt number of records. The provided level index should be + * non-negative (i.e., not refer to the buffer) and will be automatically + * translated into the appropriate index into either the disk or memory level + * vector. + */ + inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) { + if (idx>= m_levels.size() || !m_levels[idx]) { + return false; + } + + if (L == LayoutPolicy::LEVELING) { + return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); + } else { + return m_levels[idx]->get_shard_count() < m_scale_factor; + } + + /* unreachable */ + assert(true); + } +}; + +} + diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h new file mode 100644 index 0000000..b9230f4 --- /dev/null +++ b/include/framework/structure/InternalLevel.h @@ -0,0 +1,258 @@ +/* + * include/framework/InternalLevel.h + * + * Copyright (C) 2023 Douglas Rumbaugh + * Dong Xie + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include +#include + +#include "util/types.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" +#include "framework/interface/Record.h" +#include "framework/structure/MutableBuffer.h" + +namespace de { +template +class InternalLevel; + + + +template +class InternalLevel { + typedef S Shard; + typedef MutableBuffer Buffer; +public: + InternalLevel(ssize_t level_no, size_t shard_cap) + : m_level_no(level_no) + , m_shard_cnt(0) + , m_shards(shard_cap, nullptr) + , m_owns(shard_cap, true) + , m_pending_shard(nullptr) + {} + + // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 + // WARNING: for leveling only. + InternalLevel(InternalLevel* level) + : m_level_no(level->m_level_no + 1) + , m_shard_cnt(level->m_shard_cnt) + , m_shards(level->m_shards.size(), nullptr) + , m_owns(level->m_owns.size(), true) + , m_pending_shard(nullptr) + { + assert(m_shard_cnt == 1 && m_shards.size() == 1); + + for (size_t i=0; im_owns[i] = false; + m_shards[i] = level->m_shards[i]; + } + } + + ~InternalLevel() { + for (size_t i=0; i merge_levels(InternalLevel* base_level, InternalLevel* new_level) { + assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); + auto res = new InternalLevel(base_level->m_level_no, 1); + res->m_shard_cnt = 1; + Shard* shards[2]; + shards[0] = base_level->m_shards[0]; + shards[1] = new_level->m_shards[0]; + + res->m_shards[0] = new S(shards, 2); + return std::shared_ptr(res); + } + + void append_buffer(Buffer* buffer) { + if (m_shard_cnt == m_shards.size()) { + assert(m_pending_shard == nullptr); + m_pending_shard = new S(buffer); + return; + } + + m_shards[m_shard_cnt] = new S(buffer); + m_owns[m_shard_cnt] = true; + ++m_shard_cnt; + } + + void append_merged_shards(InternalLevel* level) { + if (m_shard_cnt == m_shards.size()) { + m_pending_shard = new S(level->m_shards.data(), level->m_shard_cnt); + return; + } + + m_shards[m_shard_cnt] = new S(level->m_shards.data(), level->m_shard_cnt); + m_owns[m_shard_cnt] = true; + + ++m_shard_cnt; + } + + + void finalize() { + if (m_pending_shard) { + for (size_t i=0; i> &shards, std::vector& shard_states, void *query_parms) { + for (size_t i=0; i= (ssize_t) shard_stop; i--) { + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec, true); + if (res && res->is_tombstone()) { + return true; + } + } + } + return false; + } + + bool delete_record(const R &rec) { + if (m_shard_cnt == 0) return false; + + for (size_t i = 0; i < m_shards.size(); ++i) { + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec); + if (res) { + res->set_delete(); + return true; + } + } + } + + return false; + } + + Shard* get_shard(size_t idx) { + return m_shards[idx]; + } + + size_t get_shard_count() { + return m_shard_cnt; + } + + size_t get_record_count() { + size_t cnt = 0; + for (size_t i=0; iget_record_count(); + } + + return cnt; + } + + size_t get_tombstone_count() { + size_t res = 0; + for (size_t i = 0; i < m_shard_cnt; ++i) { + res += m_shards[i]->get_tombstone_count(); + } + return res; + } + + size_t get_aux_memory_usage() { + size_t cnt = 0; + for (size_t i=0; iget_aux_memory_usage(); + } + + return cnt; + } + + size_t get_memory_usage() { + size_t cnt = 0; + for (size_t i=0; iget_memory_usage(); + } + } + + return cnt; + } + + double get_tombstone_prop() { + size_t tscnt = 0; + size_t reccnt = 0; + for (size_t i=0; iget_tombstone_count(); + reccnt += (*m_shards[i])->get_record_count(); + } + } + + return (double) tscnt / (double) (tscnt + reccnt); + } + +private: + ssize_t m_level_no; + + size_t m_shard_cnt; + size_t m_shard_size_cap; + + std::vector m_shards; + + Shard *m_pending_shard; + + std::vector m_owns; + + std::shared_ptr clone() { + auto new_level = std::make_shared(m_level_no, m_shards.size()); + for (size_t i=0; im_shards[i] = m_shards[i]; + new_level->m_owns[i] = true; + m_owns[i] = false; + } + + return new_level; + } +}; + +} diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h new file mode 100644 index 0000000..9f12175 --- /dev/null +++ b/include/framework/structure/MutableBuffer.h @@ -0,0 +1,242 @@ +/* + * include/framework/MutableBuffer.h + * + * Copyright (C) 2023 Douglas Rumbaugh + * Dong Xie + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "psu-util/alignment.h" +#include "util/bf_config.h" +#include "psu-ds/BloomFilter.h" +#include "psu-ds/Alias.h" +#include "psu-util/timer.h" +#include "framework/interface/Record.h" + +using psudb::CACHELINE_SIZE; + +namespace de { + +template +class MutableBuffer { +public: + MutableBuffer(size_t capacity, size_t max_tombstone_cap) + : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) + , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { + m_data = (Wrapped*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped)); + m_merge_data = (Wrapped*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped)); + m_tombstone_filter = nullptr; + if (max_tombstone_cap > 0) { + m_tombstone_filter = new psudb::BloomFilter(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS); + } + + m_refcnt.store(0); + m_deferred_truncate.store(false); + m_merging.store(false); + } + + ~MutableBuffer() { + assert(m_refcnt.load() == 0); + assert(m_merging.load() == false); + + if (m_data) free(m_data); + if (m_tombstone_filter) delete m_tombstone_filter; + if (m_merge_data) free(m_merge_data); + } + + template + int append(const R &rec, bool tombstone=false) { + if (tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; + + int32_t pos = 0; + if ((pos = try_advance_tail()) == -1) return 0; + + Wrapped wrec; + wrec.rec = rec; + wrec.header = 0; + if (tombstone) wrec.set_tombstone(); + + m_data[pos] = wrec; + m_data[pos].header |= (pos << 2); + + if (tombstone) { + m_tombstonecnt.fetch_add(1); + if (m_tombstone_filter) m_tombstone_filter->insert(rec); + } + + if constexpr (WeightedRecordInterface) { + m_weight.fetch_add(rec.weight); + double old = m_max_weight.load(); + while (old < rec.weight) { + m_max_weight.compare_exchange_strong(old, rec.weight); + old = m_max_weight.load(); + } + } else { + m_weight.fetch_add(1); + } + + return 1; + } + + bool truncate() { + m_tombstonecnt.store(0); + m_reccnt.store(0); + m_weight.store(0); + m_max_weight.store(0); + if (m_tombstone_filter) m_tombstone_filter->clear(); + + return true; + } + + size_t get_record_count() { + return m_reccnt; + } + + size_t get_capacity() { + return m_cap; + } + + bool is_full() { + return m_reccnt == m_cap; + } + + size_t get_tombstone_count() { + return m_tombstonecnt.load(); + } + + bool delete_record(const R& rec) { + auto offset = 0; + while (offset < m_reccnt.load()) { + if (m_data[offset].rec == rec) { + m_data[offset].set_delete(); + return true; + } + offset++; + } + + return false; + } + + bool check_tombstone(const R& rec) { + if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; + + auto offset = 0; + while (offset < m_reccnt.load()) { + if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) { + return true; + } + offset++;; + } + return false; + } + + size_t get_memory_usage() { + return m_cap * sizeof(R); + } + + size_t get_aux_memory_usage() { + return m_tombstone_filter->get_memory_usage(); + } + + size_t get_tombstone_capacity() { + return m_tombstone_cap; + } + + double get_total_weight() { + return m_weight.load(); + } + + Wrapped *get_data() { + return m_data; + } + + double get_max_weight() { + return m_max_weight; + } + + bool start_merge() { + if (m_merge_lock.try_lock()) { + /* there cannot already been an active merge */ + if (m_merging.load()) { + m_merge_lock.unlock(); + return false; + } + + m_merging.store(true); + memcpy(m_merge_data, m_data, sizeof(Wrapped) * m_reccnt.load()); + return true; + } + + /* lock could not be obtained */ + return false; + } + + bool finish_merge() { + m_merge_lock.unlock(); + return true; + } + + /* + * Concurrency-related operations + */ + bool take_reference() { + m_refcnt.fetch_add(1); + return true; + } + + bool release_reference() { + m_refcnt.fetch_add(-1); + + if (m_refcnt.load() == 0 && m_deferred_truncate.load()) { + assert(this->truncate()); + } + + return true; + } + + bool active_merge() { + return m_merging.load(); + } + +private: + int32_t try_advance_tail() { + size_t new_tail = m_reccnt.fetch_add(1); + + if (new_tail < m_cap) return new_tail; + else return -1; + } + + size_t m_cap; + size_t m_tombstone_cap; + + Wrapped* m_data; + Wrapped* m_merge_data; + + psudb::BloomFilter* m_tombstone_filter; + + alignas(64) std::atomic m_tombstonecnt; + alignas(64) std::atomic m_reccnt; + alignas(64) std::atomic m_weight; + alignas(64) std::atomic m_max_weight; + alignas(64) std::atomic m_merging; + alignas(64) std::atomic m_deferred_truncate; + alignas(64) std::atomic m_refcnt; + + alignas(64) std::mutex m_merge_lock; + alignas(64) std::mutex m_trunc_lock; + alignas(64) std::condition_variable m_trunc_signal; + +}; + +} -- cgit v1.2.3