summaryrefslogtreecommitdiffstats
path: root/include/framework/ExtensionStructure.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/ExtensionStructure.h')
-rw-r--r--include/framework/ExtensionStructure.h441
1 files changed, 0 insertions, 441 deletions
diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h
deleted file mode 100644
index 892e63b..0000000
--- a/include/framework/ExtensionStructure.h
+++ /dev/null
@@ -1,441 +0,0 @@
-/*
- * include/framework/ExtensionStructure.h
- *
- * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
- *
- * All rights reserved. Published under the Modified BSD License.
- *
- */
-#pragma once
-
-#include <atomic>
-#include <numeric>
-#include <cstdio>
-#include <vector>
-
-#include "framework/MutableBuffer.h"
-#include "framework/InternalLevel.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
-
-#include "framework/Configuration.h"
-
-#include "psu-util/timer.h"
-#include "psu-ds/Alias.h"
-
-namespace de {
-
-struct MergeTask {
- level_index m_source_level;
- level_index m_target_level;
- size_t m_size;
- size_t m_timestamp;
-
- friend bool operator<(const MergeTask &self, const MergeTask &other) {
- return self.m_timestamp < other.m_timestamp;
- }
-
- friend bool operator>(const MergeTask &self, const MergeTask &other) {
- return self.m_timestamp > other.m_timestamp;
- }
-};
-
-template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING>
-class ExtensionStructure {
- typedef S Shard;
- typedef MutableBuffer<R> Buffer;
-
-public:
- ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop)
- : m_scale_factor(scale_factor)
- , m_max_delete_prop(max_delete_prop)
- , m_buffer_size(buffer_size)
- {}
-
- ~ExtensionStructure() = default;
-
- /*
- * Create a shallow copy of this extension structure. The copy will share references to the
- * same levels/shards as the original, but will have its own lists. As all of the shards are
- * immutable (with the exception of deletes), the copy can be restructured with merges, etc.,
- * without affecting the original.
- *
- * NOTE: When using tagged deletes, a delete of a record in the original structure will affect
- * the copy, so long as the copy retains a reference to the same shard as the original. This could
- * cause synchronization problems under tagging with concurrency. Any deletes in this context will
- * need to be forwarded to the appropriate structures manually.
- */
- ExtensionStructure<R, S, Q, L> *copy() {
- auto new_struct = new ExtensionStructure<R, S, Q, L>(m_scale_factor, m_max_delete_prop, m_buffer_size);
- for (size_t i=0; i<m_levels.size(); i++) {
- new_struct->m_levels.push_back(m_levels[i]->clone());
- }
-
- return new_struct;
- }
-
- /*
- * Search for a record matching the argument and mark it deleted by
- * setting the delete bit in its wrapped header. Returns 1 if a matching
- * record was found and deleted, and 0 if a matching record was not found.
- *
- * This function will stop after finding the first matching record. It is assumed
- * that no duplicate records exist. In the case of duplicates, this function will
- * still "work", but in the sense of "delete first match".
- */
- int tagged_delete(const R &rec) {
- for (auto level : m_levels) {
- if (level && level->delete_record(rec)) {
- return 1;
- }
- }
-
- /*
- * If the record to be erased wasn't found, return 0. The
- * DynamicExtension itself will then search the active
- * Buffers.
- */
- return 0;
- }
-
- /*
- * Merge the memory table down into the tree, completing any required other
- * merges to make room for it.
- */
- inline bool merge_buffer(Buffer *buffer) {
- assert(can_merge_with(0, buffer->get_record_count()));
-
- merge_buffer_into_l0(buffer);
- buffer->truncate();
-
- return true;
- }
-
- /*
- * Return the total number of records (including tombstones) within all
- * of the levels of the structure.
- */
- size_t get_record_count() {
- size_t cnt = 0;
-
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_record_count();
- }
-
- return cnt;
- }
-
- /*
- * Return the total number of tombstones contained within all of the
- * levels of the structure.
- */
- size_t get_tombstone_cnt() {
- size_t cnt = 0;
-
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count();
- }
-
- return cnt;
- }
-
- /*
- * Return the number of levels within the structure. Note that not
- * all of these levels are necessarily populated.
- */
- size_t get_height() {
- return m_levels.size();
- }
-
- /*
- * Return the amount of memory (in bytes) used by the shards within the
- * structure for storing the primary data structure and raw data.
- */
- size_t get_memory_usage() {
- size_t cnt = 0;
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_memory_usage();
- }
-
- return cnt;
- }
-
- /*
- * Return the amount of memory (in bytes) used by the shards within the
- * structure for storing auxiliary data structures. This total does not
- * include memory used for the main data structure, or raw data.
- */
- size_t get_aux_memory_usage() {
- size_t cnt = 0;
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) {
- cnt += m_levels[i]->get_aux_memory_usage();
- }
- }
-
- return cnt;
- }
-
- /*
- * Validate that no level in the structure exceeds its maximum tombstone capacity. This is
- * used to trigger preemptive compactions at the end of the merge process.
- */
- bool validate_tombstone_proportion() {
- long double ts_prop;
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) {
- ts_prop = (long double) m_levels[i]->get_tombstone_count() / (long double) calc_level_record_capacity(i);
- if (ts_prop > (long double) m_max_delete_prop) {
- return false;
- }
- }
- }
-
- return true;
- }
-
- bool validate_tombstone_proportion(level_index level) {
- long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level);
- return ts_prop <= (long double) m_max_delete_prop;
- }
-
- /*
- * Return a reference to the underlying vector of levels within the
- * structure.
- */
- std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> &get_levels() {
- return m_levels;
- }
-
- /*
- *
- */
- std::vector<MergeTask> get_merge_tasks(size_t buffer_reccnt) {
- std::vector<MergeTask> merges;
-
- /*
- * The buffer -> L0 merge task is not included so if that
- * can be done without any other change, just return an
- * empty list.
- */
- if (can_merge_with(0, buffer_reccnt)) {
- return std::move(merges);
- }
-
- level_index merge_base_level = find_mergable_level(0);
- if (merge_base_level == -1) {
- merge_base_level = grow();
- }
-
- for (level_index i=merge_base_level; i>0; i--) {
- MergeTask task;
- task.m_source_level = i - 1;
- task.m_target_level = i;
-
- /*
- * The amount of storage required for the merge accounts
- * for the cost of storing the new records, along with the
- * cost of retaining the old records during the process
- * (hence the 2x multiplier).
- *
- * FIXME: currently does not account for the *actual* size
- * of the shards, only the storage for the records
- * themselves.
- */
- size_t reccnt = m_levels[i-1]->get_record_count();
- if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_merge_with(i, reccnt)) {
- reccnt += m_levels[i]->get_record_count();
- }
- }
- task.m_size = 2* reccnt * sizeof(R);
-
- merges.push_back(task);
- }
-
- return std::move(merges);
- }
-
-
- /*
- *
- */
- std::vector<MergeTask> get_merge_tasks_from_level(size_t source_level) {
- std::vector<MergeTask> merges;
-
- level_index merge_base_level = find_mergable_level(source_level);
- if (merge_base_level == -1) {
- merge_base_level = grow();
- }
-
- for (level_index i=merge_base_level; i>source_level; i--) {
- MergeTask task;
- task.m_source_level = i - 1;
- task.m_target_level = i;
-
- /*
- * The amount of storage required for the merge accounts
- * for the cost of storing the new records, along with the
- * cost of retaining the old records during the process
- * (hence the 2x multiplier).
- *
- * FIXME: currently does not account for the *actual* size
- * of the shards, only the storage for the records
- * themselves.
- */
- size_t reccnt = m_levels[i-1]->get_record_count();
- if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_merge_with(i, reccnt)) {
- reccnt += m_levels[i]->get_record_count();
- }
- }
- task.m_size = 2* reccnt * sizeof(R);
-
- merges.push_back(task);
- }
-
- return std::move(merges);
- }
-
- /*
- * Merge the level specified by incoming level into the level specified
- * by base level. The two levels should be sequential--i.e. no levels
- * are skipped in the merge process--otherwise the tombstone ordering
- * invariant may be violated by the merge operation.
- */
- inline void merge_levels(level_index base_level, level_index incoming_level) {
- // merging two memory levels
- if constexpr (L == LayoutPolicy::LEVELING) {
- auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get());
- } else {
- m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get());
- m_levels[base_level]->finalize();
- }
-
- m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
- }
-
-
-private:
- size_t m_scale_factor;
- double m_max_delete_prop;
- size_t m_buffer_size;
-
- std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels;
-
- /*
- * Add a new level to the LSM Tree and return that level's index. Will
- * automatically determine whether the level should be on memory or on disk,
- * and act appropriately.
- */
- inline level_index grow() {
- level_index new_idx = m_levels.size();
- size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
-
- m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)));
- return new_idx;
- }
-
- /*
- * Find the first level below the level indicated by idx that
- * is capable of sustaining a merge operation and return its
- * level index. If no such level exists, returns -1. Also
- * returns -1 if idx==0, and no such level exists, to skimplify
- * the logic of the first merge.
- */
- inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) {
-
- if (idx == 0 && m_levels.size() == 0) return -1;
-
- bool level_found = false;
- bool disk_level;
- level_index merge_level_idx;
-
- size_t incoming_rec_cnt = get_level_record_count(idx, buffer);
- for (level_index i=idx+1; i<m_levels.size(); i++) {
- if (can_merge_with(i, incoming_rec_cnt)) {
- return i;
- }
-
- incoming_rec_cnt = get_level_record_count(i);
- }
-
- return -1;
- }
-
- inline void merge_buffer_into_l0(Buffer *buffer) {
- assert(m_levels[0]);
- if constexpr (L == LayoutPolicy::LEVELING) {
- // FIXME: Kludgey implementation due to interface constraints.
- auto old_level = m_levels[0].get();
- auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
- temp_level->append_buffer(buffer);
- auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level);
-
- m_levels[0] = new_level;
- delete temp_level;
- } else {
- m_levels[0]->append_buffer(buffer);
- }
- }
-
- /*
- * Mark a given memory level as no-longer in use by the tree. For now this
- * will just free the level. In future, this will be more complex as the
- * level may not be able to immediately be deleted, depending upon who
- * else is using it.
- */
- inline void mark_as_unused(std::shared_ptr<InternalLevel<R, Shard, Q>> level) {
- level.reset();
- }
-
- /*
- * Assume that level "0" should be larger than the buffer. The buffer
- * itself is index -1, which should return simply the buffer capacity.
- */
- inline size_t calc_level_record_capacity(level_index idx) {
- return m_buffer_size * pow(m_scale_factor, idx+1);
- }
-
- /*
- * Returns the actual number of records present on a specified level. An
- * index value of -1 indicates the memory table. Can optionally pass in
- * a pointer to the memory table to use, if desired. Otherwise, there are
- * no guarantees about which buffer will be accessed if level_index is -1.
- */
- inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) {
- if (buffer) {
- return buffer->get_record_count();
- }
-
- return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0;
- }
-
- /*
- * Determines if the specific level can merge with another record containing
- * incoming_rec_cnt number of records. The provided level index should be
- * non-negative (i.e., not refer to the buffer) and will be automatically
- * translated into the appropriate index into either the disk or memory level
- * vector.
- */
- inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) {
- if (idx>= m_levels.size() || !m_levels[idx]) {
- return false;
- }
-
- if (L == LayoutPolicy::LEVELING) {
- return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx);
- } else {
- return m_levels[idx]->get_shard_count() < m_scale_factor;
- }
-
- /* unreachable */
- assert(true);
- }
-};
-
-}
-