diff options
Diffstat (limited to 'include/framework/structure')
| -rw-r--r-- | include/framework/structure/BufferView.h | 170 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 495 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 271 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 313 |
4 files changed, 1249 insertions, 0 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h new file mode 100644 index 0000000..9e0872b --- /dev/null +++ b/include/framework/structure/BufferView.h @@ -0,0 +1,170 @@ +/* + * include/framework/structure/BufferView.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * TODO: This file is very poorly commented. + */ +#pragma once + +#include <cstdlib> +#include <cassert> +#include <functional> +#include <utility> + +#include "psu-util/alignment.h" +#include "psu-ds/BloomFilter.h" +#include "framework/interface/Record.h" + +namespace de { + +typedef std::_Bind<void (*(void*, long unsigned int))(void*, long unsigned int)> ReleaseFunction; + +template <RecordInterface R> +class BufferView { +public: + BufferView() = default; + + /* + * the BufferView's lifetime is tightly linked to buffer versioning, and so + * copying and assignment are disabled. + */ + BufferView(const BufferView&) = delete; + BufferView &operator=(BufferView &) = delete; + + BufferView(BufferView &&other) + : m_data(std::exchange(other.m_data, nullptr)) + , m_release(std::move(other.m_release)) + , m_head(std::exchange(other.m_head, 0)) + , m_tail(std::exchange(other.m_tail, 0)) + , m_start(std::exchange(other.m_start, 0)) + , m_stop(std::exchange(other.m_stop, 0)) + , m_cap(std::exchange(other.m_cap, 0)) + , m_approx_ts_cnt(std::exchange(other.m_approx_ts_cnt, 0)) + , m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr)) + , m_active(std::exchange(other.m_active, false)) {} + + BufferView &operator=(BufferView &&other) = delete; + + + BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail, size_t tombstone_cnt, psudb::BloomFilter<R> *filter, + ReleaseFunction release) + : m_data(buffer) + , m_release(release) + , m_head(head) + , m_tail(tail) + , m_start(m_head % cap) + , m_stop(m_tail % cap) + , m_cap(cap) + , m_approx_ts_cnt(tombstone_cnt) + , m_tombstone_filter(filter) + , m_active(true) {} + + ~BufferView() { + if (m_active) { + m_release(); + } + } + + bool check_tombstone(const R& rec) { + if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; + + for (size_t i=0; i<get_record_count(); i++) { + if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) { + return true; + } + } + + return false; + } + + bool delete_record(const R& rec) { + if (m_start < m_stop) { + for (size_t i=m_start; i<m_stop; i++) { + if (m_data[i].rec == rec) { + m_data[i].set_delete(); + return true; + } + } + } else { + for (size_t i=m_start; i<m_cap; i++) { + if (m_data[i].rec == rec) { + m_data[i].set_delete(); + return true; + } + } + + for (size_t i=0; i<m_stop; i++) { + if (m_data[i].rec == rec) { + m_data[i].set_delete(); + return true; + } + + } + + } + + return false; + } + + size_t get_record_count() { + return m_tail - m_head; + } + + /* + * NOTE: This function returns an upper bound on the number + * of tombstones within the view. There may be less than + * this, due to synchronization issues during view creation. + */ + size_t get_tombstone_count() { + return m_approx_ts_cnt; + } + + Wrapped<R> *get(size_t i) { + assert(i < get_record_count()); + return m_data + to_idx(i); + } + + void copy_to_buffer(psudb::byte *buffer) { + /* check if the region to be copied circles back to start. If so, do it in two steps */ + if (m_start > m_stop) { + size_t split_idx = m_cap - m_start; + + memcpy(buffer, (std::byte*) (m_data + m_start), split_idx* sizeof(Wrapped<R>)); + memcpy(buffer + (split_idx * sizeof(Wrapped<R>)), (std::byte*) m_data, m_stop * sizeof(Wrapped<R>)); + } else { + memcpy(buffer, (std::byte*) (m_data + m_start), get_record_count() * sizeof(Wrapped<R>)); + } + } + + size_t get_tail() { + return m_tail; + } + + size_t get_head() { + return m_head; + } + +private: + Wrapped<R>* m_data; + ReleaseFunction m_release; + size_t m_head; + size_t m_tail; + size_t m_start; + size_t m_stop; + size_t m_cap; + size_t m_approx_ts_cnt; + psudb::BloomFilter<R> *m_tombstone_filter; + bool m_active; + + size_t to_idx(size_t i) { + size_t idx = (m_start + i >= m_cap) ? i = (m_cap - m_start) + : m_start + i; + assert(idx < m_cap); + return idx; + } +}; + +} diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h new file mode 100644 index 0000000..4802bc1 --- /dev/null +++ b/include/framework/structure/ExtensionStructure.h @@ -0,0 +1,495 @@ +/* + * include/framework/structure/ExtensionStructure.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once + +#include <atomic> +#include <cstdio> +#include <vector> + +#include "framework/structure/BufferView.h" +#include "framework/structure/InternalLevel.h" + +#include "framework/util/Configuration.h" + +#include "psu-util/timer.h" + +namespace de { + +template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L=LayoutPolicy::TEIRING> +class ExtensionStructure { + typedef S Shard; + typedef BufferView<R> BuffView; + +public: + ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) + : m_scale_factor(scale_factor) + , m_max_delete_prop(max_delete_prop) + , m_buffer_size(buffer_size) + {} + + ~ExtensionStructure() = default; + + /* + * Create a shallow copy of this extension structure. The copy will share + * references to the same levels/shards as the original, but will have its + * own lists. As all of the shards are immutable (with the exception of + * deletes), the copy can be restructured with reconstructions and flushes + * without affecting the original. The copied structure will be returned + * with a reference count of 0; generally you will want to immediately call + * take_reference() on it. + * + * NOTE: When using tagged deletes, a delete of a record in the original + * structure will affect the copy, so long as the copy retains a reference + * to the same shard as the original. This could cause synchronization + * problems under tagging with concurrency. Any deletes in this context will + * need to be forwarded to the appropriate structures manually. + */ + ExtensionStructure<R, S, Q, L> *copy() { + auto new_struct = new ExtensionStructure<R, S, Q, L>(m_buffer_size, m_scale_factor, + m_max_delete_prop); + for (size_t i=0; i<m_levels.size(); i++) { + new_struct->m_levels.push_back(m_levels[i]->clone()); + } + + new_struct->m_refcnt = 0; + + return new_struct; + } + + /* + * Search for a record matching the argument and mark it deleted by + * setting the delete bit in its wrapped header. Returns 1 if a matching + * record was found and deleted, and 0 if a matching record was not found. + * + * This function will stop after finding the first matching record. It is + * assumed that no duplicate records exist. In the case of duplicates, this + * function will still "work", but in the sense of "delete first match". + */ + int tagged_delete(const R &rec) { + for (auto level : m_levels) { + if (level && level->delete_record(rec)) { + return 1; + } + } + + /* + * If the record to be erased wasn't found, return 0. The + * DynamicExtension itself will then search the active + * Buffers. + */ + return 0; + } + + /* + * Flush a buffer into the extension structure, performing any necessary + * reconstructions to free up room in L0. + * + * FIXME: arguably, this should be a method attached to the buffer that + * takes a structure as input. + */ + inline bool flush_buffer(BuffView buffer) { + assert(can_reconstruct_with(0, buffer.get_record_count())); + + flush_buffer_into_l0(std::move(buffer)); + + return true; + } + + /* + * Return the total number of records (including tombstones) within all + * of the levels of the structure. + */ + size_t get_record_count() { + size_t cnt = 0; + + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) cnt += m_levels[i]->get_record_count(); + } + + return cnt; + } + + /* + * Return the total number of tombstones contained within all of the + * levels of the structure. + */ + size_t get_tombstone_count() { + size_t cnt = 0; + + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count(); + } + + return cnt; + } + + /* + * Return the number of levels within the structure. Note that not + * all of these levels are necessarily populated. + */ + size_t get_height() { + return m_levels.size(); + } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing the primary data structure and raw data. + */ + size_t get_memory_usage() { + size_t cnt = 0; + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) cnt += m_levels[i]->get_memory_usage(); + } + + return cnt; + } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing auxiliary data structures. This total does not + * include memory used for the main data structure, or raw data. + */ + size_t get_aux_memory_usage() { + size_t cnt = 0; + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) { + cnt += m_levels[i]->get_aux_memory_usage(); + } + } + + return cnt; + } + + /* + * Validate that no level in the structure exceeds its maximum tombstone + * capacity. This is used to trigger preemptive compactions at the end of + * the reconstruction process. + */ + bool validate_tombstone_proportion() { + long double ts_prop; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + ts_prop = (long double)m_levels[i]->get_tombstone_count() / + (long double)calc_level_record_capacity(i); + if (ts_prop > (long double)m_max_delete_prop) { + return false; + } + } + } + + return true; + } + + bool validate_tombstone_proportion(level_index level) { + long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level); + return ts_prop <= (long double) m_max_delete_prop; + } + + /* + * Return a reference to the underlying vector of levels within the + * structure. + */ + std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> &get_levels() { + return m_levels; + } + + std::vector<ReconstructionTask> get_compaction_tasks() { + std::vector<ReconstructionTask> tasks; + + /* if the tombstone/delete invariant is satisfied, no need for compactions */ + if (validate_tombstone_proportion()) { + return tasks; + } + + /* locate the first level to violate the invariant */ + level_index violation_idx = -1; + for (level_index i=0; i<m_levels.size(); i++) { + if (!validate_tombstone_proportion(i)) { + violation_idx = i; + break; + } + } + + assert(violation_idx != -1); + + level_index base_level = find_reconstruction_target(violation_idx); + if (base_level == -1) { + base_level = grow(); + } + + for (level_index i=base_level; i>0; i--) { + ReconstructionTask task = {i-1, i}; + + /* + * The amount of storage required for the reconstruction accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i - 1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_reconstruct_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } + //task.m_size = 2* reccnt * sizeof(R); + + tasks.push_back(task); + } + + return tasks; + } + + /* + * + */ + std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt) { + std::vector<ReconstructionTask> reconstructions; + + /* + * The buffer flush is not included so if that can be done without any + * other change, just return an empty list. + */ + if (can_reconstruct_with(0, buffer_reccnt)) { + return std::move(reconstructions); + } + + level_index base_level = find_reconstruction_target(0); + if (base_level == -1) { + base_level = grow(); + } + + for (level_index i=base_level; i>0; i--) { + ReconstructionTask task = {i-1, i}; + + /* + * The amount of storage required for the reconstruction accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i-1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_reconstruct_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } + //task.m_size = 2* reccnt * sizeof(R); + + reconstructions.push_back(task); + } + + return std::move(reconstructions); + } + + + /* + * + */ + std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level) { + std::vector<ReconstructionTask> reconstructions; + + level_index base_level = find_reconstruction_target(source_level); + if (base_level == -1) { + base_level = grow(); + } + + for (level_index i=base_level; i>source_level; i--) { + ReconstructionTask task = {i - 1, i}; + /* + * The amount of storage required for the reconstruction accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i-1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_reconstruct_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } +// task.m_size = 2* reccnt * sizeof(R); + + reconstructions.push_back(task); + } + + return reconstructions; + } + + /* + * Combine incoming_level with base_level and reconstruct the shard, + * placing it in base_level. The two levels should be sequential--i.e. no + * levels are skipped in the reconstruction process--otherwise the + * tombstone ordering invariant may be violated. + */ + inline void reconstruction(level_index base_level, level_index incoming_level) { + if constexpr (L == LayoutPolicy::LEVELING) { + /* if the base level has a shard, merge the base and incoming together to make a new one */ + if (m_levels[base_level]->get_shard_count() > 0) { + m_levels[base_level] = InternalLevel<R, Shard, Q>::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get()); + /* otherwise, we can just move the incoming to the base */ + } else { + m_levels[base_level] = m_levels[incoming_level]; + } + } else { + m_levels[base_level]->append_level(m_levels[incoming_level].get()); + m_levels[base_level]->finalize(); + } + + /* place a new, empty level where the incoming level used to be */ + m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + } + + bool take_reference() { + m_refcnt.fetch_add(1); + return true; + } + + bool release_reference() { + assert(m_refcnt.load() > 0); + m_refcnt.fetch_add(-1); + return true; + } + + size_t get_reference_count() { + return m_refcnt.load(); + } + + std::vector<void *> get_query_states(std::vector<std::pair<ShardID, Shard*>> &shards, void *parms) { + std::vector<void*> states; + + for (auto &level : m_levels) { + level->get_query_states(shards, states, parms); + } + + return states; + } + +private: + size_t m_scale_factor; + double m_max_delete_prop; + size_t m_buffer_size; + + std::atomic<size_t> m_refcnt; + + std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels; + + /* + * Add a new level to the structure and return its index. + */ + inline level_index grow() { + level_index new_idx = m_levels.size(); + size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt))); + return new_idx; + } + + /* + * Find the first level below the level indicated by idx that + * is capable of sustaining a reconstruction and return its + * level index. If no such level exists, returns -1. Also + * returns -1 if idx==0, and no such level exists, to simplify + * the logic of the first buffer flush. + */ + inline level_index find_reconstruction_target(level_index idx) { + + if (idx == 0 && m_levels.size() == 0) return -1; + + size_t incoming_rec_cnt = get_level_record_count(idx); + for (level_index i=idx+1; i<m_levels.size(); i++) { + if (can_reconstruct_with(i, incoming_rec_cnt)) { + return i; + } + + incoming_rec_cnt = get_level_record_count(i); + } + + return -1; + } + + inline void flush_buffer_into_l0(BuffView buffer) { + assert(m_levels[0]); + if constexpr (L == LayoutPolicy::LEVELING) { + // FIXME: Kludgey implementation due to interface constraints. + auto old_level = m_levels[0].get(); + auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); + temp_level->append_buffer(std::move(buffer)); + + if (old_level->get_shard_count() > 0) { + m_levels[0] = InternalLevel<R, Shard, Q>::reconstruction(old_level, temp_level); + delete temp_level; + } else { + m_levels[0] = std::shared_ptr<InternalLevel<R, Shard, Q>>(temp_level); + } + } else { + m_levels[0]->append_buffer(std::move(buffer)); + } + } + + /* + * Mark a given memory level as no-longer in use by the tree. For now this + * will just free the level. In future, this will be more complex as the + * level may not be able to immediately be deleted, depending upon who + * else is using it. + */ + inline void mark_as_unused(std::shared_ptr<InternalLevel<R, Shard, Q>> level) { + level.reset(); + } + + /* + * Assume that level "0" should be larger than the buffer. The buffer + * itself is index -1, which should return simply the buffer capacity. + */ + inline size_t calc_level_record_capacity(level_index idx) { + return m_buffer_size * pow(m_scale_factor, idx+1); + } + + /* + * Returns the number of records present on a specified level. + */ + inline size_t get_level_record_count(level_index idx) { + return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; + } + + /* + * Determines if a level can sustain a reconstruction with incoming_rec_cnt + * additional records without exceeding its capacity. + */ + inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) { + if (idx >= m_levels.size() || !m_levels[idx]) { + return false; + } + + if (L == LayoutPolicy::LEVELING) { + return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); + } else { + return m_levels[idx]->get_shard_count() < m_scale_factor; + } + + /* unreachable */ + assert(true); + } +}; + +} + diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h new file mode 100644 index 0000000..db38946 --- /dev/null +++ b/include/framework/structure/InternalLevel.h @@ -0,0 +1,271 @@ +/* + * include/framework/structure/InternalLevel.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + * The word `Internal` in this class's name refers to memory. The current + * model, inherited from the framework in Practical Dynamic Extension for + * Sampling Indexes, would use a different ExternalLevel for shards stored + * on external storage. This is a distinction that can probably be avoided + * with some more thought being put into interface design. + * + */ +#pragma once + +#include <vector> +#include <memory> + +#include "util/types.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" +#include "framework/interface/Record.h" +#include "framework/structure/BufferView.h" + +namespace de { +template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q> +class InternalLevel; + + + +template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q> +class InternalLevel { + typedef S Shard; + typedef BufferView<R> BuffView; +public: + InternalLevel(ssize_t level_no, size_t shard_cap) + : m_level_no(level_no) + , m_shard_cnt(0) + , m_shards(shard_cap, nullptr) + , m_pending_shard(nullptr) + {} + + ~InternalLevel() { + delete m_pending_shard; + } + + /* + * Create a new shard combining the records from base_level and new_level, + * and return a shared_ptr to a new level containing this shard. This is used + * for reconstructions under the leveling layout policy. + * + * No changes are made to the levels provided as arguments. + */ + static std::shared_ptr<InternalLevel> reconstruction(InternalLevel* base_level, InternalLevel* new_level) { + assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); + auto res = new InternalLevel(base_level->m_level_no, 1); + res->m_shard_cnt = 1; + std::vector<Shard *> shards = {base_level->m_shards[0].get(), + new_level->m_shards[0].get()}; + + res->m_shards[0] = std::make_shared<S>(shards); + return std::shared_ptr<InternalLevel>(res); + } + + /* + * Create a new shard combining the records from all of + * the shards in level, and append this new shard into + * this level. This is used for reconstructions under + * the tiering layout policy. + * + * No changes are made to the level provided as an argument. + */ + void append_level(InternalLevel* level) { + // FIXME: that this is happening probably means that + // something is going terribly wrong earlier in the + // reconstruction logic. + if (level->get_shard_count() == 0) { + return; + } + + std::vector<S*> shards; + for (auto shard : level->m_shards) { + if (shard) shards.emplace_back(shard.get()); + } + + if (m_shard_cnt == m_shards.size()) { + m_pending_shard = new S(shards); + return; + } + + auto tmp = new S(shards); + m_shards[m_shard_cnt] = std::shared_ptr<S>(tmp); + + ++m_shard_cnt; + } + + /* + * Create a new shard using the records in the + * provided buffer, and append this new shard + * into this level. This is used for buffer + * flushes under the tiering layout policy. + */ + void append_buffer(BuffView buffer) { + if (m_shard_cnt == m_shards.size()) { + assert(m_pending_shard == nullptr); + m_pending_shard = new S(std::move(buffer)); + return; + } + + m_shards[m_shard_cnt] = std::make_shared<S>(std::move(buffer)); + ++m_shard_cnt; + } + + void finalize() { + if (m_pending_shard) { + for (size_t i=0; i<m_shards.size(); i++) { + m_shards[i] = nullptr; + } + + m_shards[0] = std::shared_ptr<S>(m_pending_shard); + m_pending_shard = nullptr; + m_shard_cnt = 1; + } + } + + /* + * Create a new shard containing the combined records + * from all shards on this level and return it. + * + * No changes are made to this level. + */ + Shard *get_combined_shard() { + if (m_shard_cnt == 0) { + return nullptr; + } + + std::vector<Shard *> shards; + for (auto shard : m_shards) { + if (shard) shards.emplace_back(shard.get()); + } + + return new S(shards); + } + + void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) { + for (size_t i=0; i<m_shard_cnt; i++) { + if (m_shards[i]) { + auto shard_state = Q::get_query_state(m_shards[i].get(), query_parms); + shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i].get()}); + shard_states.emplace_back(shard_state); + } + } + } + + bool check_tombstone(size_t shard_stop, const R& rec) { + if (m_shard_cnt == 0) return false; + + for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) { + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec, true); + if (res && res->is_tombstone()) { + return true; + } + } + } + return false; + } + + bool delete_record(const R &rec) { + if (m_shard_cnt == 0) return false; + + for (size_t i = 0; i < m_shards.size(); ++i) { + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec); + if (res) { + res->set_delete(); + return true; + } + } + } + + return false; + } + + Shard* get_shard(size_t idx) { + return m_shards[idx].get(); + } + + size_t get_shard_count() { + return m_shard_cnt; + } + + size_t get_record_count() { + size_t cnt = 0; + for (size_t i=0; i<m_shard_cnt; i++) { + if (m_shards[i]) { + cnt += m_shards[i]->get_record_count(); + } + } + + return cnt; + } + + size_t get_tombstone_count() { + size_t res = 0; + for (size_t i = 0; i < m_shard_cnt; ++i) { + if (m_shards[i]) { + res += m_shards[i]->get_tombstone_count(); + } + } + return res; + } + + size_t get_aux_memory_usage() { + size_t cnt = 0; + for (size_t i=0; i<m_shard_cnt; i++) { + if (m_shards[i]){ + cnt += m_shards[i]->get_aux_memory_usage(); + } + } + + return cnt; + } + + size_t get_memory_usage() { + size_t cnt = 0; + for (size_t i=0; i<m_shard_cnt; i++) { + if (m_shards[i]) { + cnt += m_shards[i]->get_memory_usage(); + } + } + + return cnt; + } + + double get_tombstone_prop() { + size_t tscnt = 0; + size_t reccnt = 0; + for (size_t i=0; i<m_shard_cnt; i++) { + if (m_shards[i]) { + tscnt += m_shards[i]->get_tombstone_count(); + reccnt += m_shards[i]->get_record_count(); + } + } + + return (double) tscnt / (double) (tscnt + reccnt); + } + + std::shared_ptr<InternalLevel> clone() { + auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size()); + for (size_t i=0; i<m_shard_cnt; i++) { + new_level->m_shards[i] = m_shards[i]; + } + new_level->m_shard_cnt = m_shard_cnt; + + return new_level; + } + +private: + ssize_t m_level_no; + + size_t m_shard_cnt; + size_t m_shard_size_cap; + + std::vector<std::shared_ptr<Shard>> m_shards; + Shard *m_pending_shard; +}; + +} diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h new file mode 100644 index 0000000..415c95a --- /dev/null +++ b/include/framework/structure/MutableBuffer.h @@ -0,0 +1,313 @@ +/* + * include/framework/structure/MutableBuffer.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + * NOTE: Concerning the tombstone count. One possible approach + * would be to track the number of tombstones below and above the + * low water mark--this would be straightforward to do. Then, if we + * *require* that the head only advance up to the LWM, we can get a + * correct view on the number of tombstones in the active buffer at + * any point in time, and the BufferView will have a pretty good + * approximation as well (potentially with a few extra if new inserts + * happen between when the tail pointer and tombstone count are fetched) + * + */ +#pragma once + +#include <cstdlib> +#include <atomic> +#include <cassert> +#include <immintrin.h> + +#include "psu-util/alignment.h" +#include "util/bf_config.h" +#include "psu-ds/BloomFilter.h" +#include "framework/interface/Record.h" +#include "framework/structure/BufferView.h" + +using psudb::CACHELINE_SIZE; + +namespace de { + +template <RecordInterface R> +class MutableBuffer { + friend class BufferView<R>; + + struct buffer_head { + size_t head_idx; + size_t refcnt; + }; + +public: + MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0) + : m_lwm(low_watermark) + , m_hwm(high_watermark) + , m_cap((capacity == 0) ? 2 * high_watermark : capacity) + , m_tail(0) + , m_head({0, 0}) + , m_old_head({high_watermark, 0}) + , m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>))) + , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS)) + , m_tscnt(0) + , m_old_tscnt(0) + , m_active_head_advance(false) + { + assert(m_cap > m_hwm); + assert(m_hwm > m_lwm); + } + + ~MutableBuffer() { + free(m_data); + delete m_tombstone_filter; + } + + int append(const R &rec, bool tombstone=false) { + int32_t tail = 0; + if ((tail = try_advance_tail()) == -1) { + return 0; + } + + Wrapped<R> wrec; + wrec.rec = rec; + wrec.header = 0; + if (tombstone) wrec.set_tombstone(); + + size_t pos = tail % m_cap; + + m_data[pos] = wrec; + m_data[pos].header |= (pos << 2); + + if (tombstone) { + m_tscnt.fetch_add(1); + if (m_tombstone_filter) m_tombstone_filter->insert(rec); + } + + return 1; + } + + bool truncate() { + m_tscnt.store(0); + m_tail.store(0); + if (m_tombstone_filter) m_tombstone_filter->clear(); + + return true; + } + + size_t get_record_count() { + return m_tail.load() - m_head.load().head_idx; + } + + size_t get_capacity() { + return m_cap; + } + + bool is_full() { + return get_record_count() >= m_hwm; + } + + bool is_at_low_watermark() { + return get_record_count() >= m_lwm; + } + + size_t get_tombstone_count() { + return m_tscnt.load(); + } + + bool delete_record(const R& rec) { + return get_buffer_view().delete_record(rec); + } + + bool check_tombstone(const R& rec) { + return get_buffer_view().check_tombstone(rec); + } + + size_t get_memory_usage() { + return m_cap * sizeof(Wrapped<R>); + } + + size_t get_aux_memory_usage() { + return m_tombstone_filter->get_memory_usage(); + } + + BufferView<R> get_buffer_view(size_t target_head) { + size_t head = get_head(target_head); + auto f = std::bind(release_head_reference, (void *) this, head); + + return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); + } + + BufferView<R> get_buffer_view() { + size_t head = get_head(m_head.load().head_idx); + auto f = std::bind(release_head_reference, (void *) this, head); + + return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); + } + + /* + * Advance the buffer following a reconstruction. Move current + * head and head_refcnt into old_head and old_head_refcnt, then + * assign new_head to old_head. + */ + bool advance_head(size_t new_head) { + assert(new_head > m_head.load().head_idx); + assert(new_head <= m_tail.load()); + + /* refuse to advance head while there is an old with one references */ + if (m_old_head.load().refcnt > 0) { + //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n"); + return false; + } + + m_active_head_advance.store(true); + + buffer_head new_hd = {new_head, 0}; + buffer_head cur_hd; + + /* replace current head with new head */ + do { + cur_hd = m_head.load(); + } while(!m_head.compare_exchange_strong(cur_hd, new_hd)); + + /* move the current head into the old head */ + m_old_head.store(cur_hd); + + m_active_head_advance.store(false); + return true; + } + + /* + * FIXME: If target_head does not match *either* the old_head or the + * current_head, this routine will loop infinitely. + */ + size_t get_head(size_t target_head) { + buffer_head cur_hd, new_hd; + bool head_acquired = false; + + do { + if (m_old_head.load().head_idx == target_head) { + cur_hd = m_old_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else if (m_head.load().head_idx == target_head){ + cur_hd = m_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } while(!head_acquired); + + return new_hd.head_idx; + } + + void set_low_watermark(size_t lwm) { + assert(lwm < m_hwm); + m_lwm = lwm; + } + + size_t get_low_watermark() { + return m_lwm; + } + + void set_high_watermark(size_t hwm) { + assert(hwm > m_lwm); + assert(hwm < m_cap); + m_hwm = hwm; + } + + size_t get_high_watermark() { + return m_hwm; + } + + size_t get_tail() { + return m_tail.load(); + } + + /* + * Note: this returns the available physical storage capacity, + * *not* now many more records can be inserted before the + * HWM is reached. It considers the old_head to be "free" + * when it has no remaining references. This should be true, + * but a buggy framework implementation may violate the + * assumption. + */ + size_t get_available_capacity() { + if (m_old_head.load().refcnt == 0) { + return m_cap - (m_tail.load() - m_head.load().head_idx); + } + + return m_cap - (m_tail.load() - m_old_head.load().head_idx); + } + +private: + int64_t try_advance_tail() { + size_t old_value = m_tail.load(); + + /* if full, fail to advance the tail */ + if (old_value - m_head.load().head_idx >= m_hwm) { + return -1; + } + + while (!m_tail.compare_exchange_strong(old_value, old_value+1)) { + /* if full, stop trying and fail to advance the tail */ + if (m_tail.load() >= m_hwm) { + return -1; + } + + _mm_pause(); + } + + return old_value; + } + + size_t to_idx(size_t i, size_t head) { + return (head + i) % m_cap; + } + + static void release_head_reference(void *buff, size_t head) { + MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff; + + buffer_head cur_hd, new_hd; + do { + if (buffer->m_old_head.load().head_idx == head) { + cur_hd = buffer->m_old_head; + if (cur_hd.refcnt == 0) continue; + new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; + if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { + break; + } + } else { + cur_hd = buffer->m_head; + if (cur_hd.refcnt == 0) continue; + new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; + + if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { + break; + } + } + _mm_pause(); + } while(true); + } + + size_t m_lwm; + size_t m_hwm; + size_t m_cap; + + alignas(64) std::atomic<size_t> m_tail; + + alignas(64) std::atomic<buffer_head> m_head; + alignas(64) std::atomic<buffer_head> m_old_head; + + Wrapped<R>* m_data; + psudb::BloomFilter<R>* m_tombstone_filter; + alignas(64) std::atomic<size_t> m_tscnt; + size_t m_old_tscnt; + + alignas(64) std::atomic<bool> m_active_head_advance; +}; + +} |