From 5617bed5257506d3dfda8537b16f44b3e40f1b42 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Sun, 22 Dec 2024 13:00:19 -0500 Subject: Began overhauling reconstruction mechanism --- include/framework/structure/ExtensionStructure.h | 561 +++-------------------- include/framework/structure/InternalLevel.h | 154 ++----- 2 files changed, 91 insertions(+), 624 deletions(-) (limited to 'include/framework/structure') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index c81ad05..9b7ae87 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -11,40 +11,23 @@ #include #include +#include #include #include "framework/structure/BufferView.h" #include "framework/structure/InternalLevel.h" - -#include "framework/util/Configuration.h" - -#include "psu-util/timer.h" #include "util/types.h" namespace de { -template QueryType, - LayoutPolicy L = LayoutPolicy::TEIRING> +template QueryType> class ExtensionStructure { typedef typename ShardType::RECORD RecordType; typedef BufferView BuffView; - - typedef struct { - size_t reccnt; - size_t reccap; - - size_t shardcnt; - size_t shardcap; - } level_state; - - typedef std::vector state_vector; - + typedef std::vector>> + LevelVector; public: - ExtensionStructure(size_t buffer_size, size_t scale_factor, - double max_delete_prop) - : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop), - m_buffer_size(buffer_size) {} - + ExtensionStructure() = default; ~ExtensionStructure() = default; /* @@ -62,16 +45,13 @@ public: * problems under tagging with concurrency. Any deletes in this context will * need to be forwarded to the appropriate structures manually. */ - ExtensionStructure *copy() { - auto new_struct = new ExtensionStructure( - m_buffer_size, m_scale_factor, m_max_delete_prop); + ExtensionStructure *copy() const { + auto new_struct = new ExtensionStructure(); for (size_t i = 0; i < m_levels.size(); i++) { new_struct->m_levels.push_back(m_levels[i]->clone()); } new_struct->m_refcnt = 0; - new_struct->m_current_state = m_current_state; - return new_struct; } @@ -99,31 +79,11 @@ public: return 0; } - /* - * Flush a buffer into the extension structure, performing any necessary - * reconstructions to free up room in L0. - * - * FIXME: arguably, this should be a method attached to the buffer that - * takes a structure as input. - */ - inline bool flush_buffer(BuffView buffer) { - state_vector tmp = m_current_state; - - if (tmp.size() == 0) { - grow(tmp); - } - - assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); - flush_buffer_into_l0(std::move(buffer)); - - return true; - } - /* * Return the total number of records (including tombstones) within all * of the levels of the structure. */ - size_t get_record_count() { + size_t get_record_count() const { size_t cnt = 0; for (size_t i = 0; i < m_levels.size(); i++) { @@ -138,7 +98,7 @@ public: * Return the total number of tombstones contained within all of the * levels of the structure. */ - size_t get_tombstone_count() { + size_t get_tombstone_count() const { size_t cnt = 0; for (size_t i = 0; i < m_levels.size(); i++) { @@ -153,13 +113,13 @@ public: * Return the number of levels within the structure. Note that not * all of these levels are necessarily populated. */ - size_t get_height() { return m_levels.size(); } + size_t get_height() const { return m_levels.size(); } /* * Return the amount of memory (in bytes) used by the shards within the * structure for storing the primary data structure and raw data. */ - size_t get_memory_usage() { + size_t get_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_levels.size(); i++) { if (m_levels[i]) @@ -174,7 +134,7 @@ public: * structure for storing auxiliary data structures. This total does not * include memory used for the main data structure, or raw data. */ - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_levels.size(); i++) { if (m_levels[i]) { @@ -185,326 +145,74 @@ public: return cnt; } - /* - * Validate that no level in the structure exceeds its maximum tombstone - * capacity. This is used to trigger preemptive compactions at the end of - * the reconstruction process. - */ - bool validate_tombstone_proportion() { - long double ts_prop; - for (size_t i = 0; i < m_levels.size(); i++) { - if (m_levels[i]) { - ts_prop = (long double)m_levels[i]->get_tombstone_count() / - (long double)calc_level_record_capacity(i); - if (ts_prop > (long double)m_max_delete_prop) { - return false; - } - } - } - - return true; - } - - bool validate_tombstone_proportion(level_index level) { - long double ts_prop = (long double)m_levels[level]->get_tombstone_count() / - (long double)calc_level_record_capacity(level); - return ts_prop <= (long double)m_max_delete_prop; - } - - /* - * Return a reference to the underlying vector of levels within the - * structure. - */ - std::vector>> & - get_levels() { - return m_levels; - } - - /* - * NOTE: This cannot be simulated, because tombstone cancellation is not - * cheaply predictable. It is possible that the worst case number could - * be used instead, to allow for prediction, but compaction isn't a - * major concern outside of sampling; at least for now. So I'm not - * going to focus too much time on it at the moment. - */ - ReconstructionVector get_compaction_tasks() { - ReconstructionVector tasks; - state_vector scratch_state = m_current_state; - - /* if the tombstone/delete invariant is satisfied, no need for compactions - */ - if (validate_tombstone_proportion()) { - return tasks; - } - - /* locate the first level to violate the invariant */ - level_index violation_idx = -1; - for (level_index i = 0; i < m_levels.size(); i++) { - if (!validate_tombstone_proportion(i)) { - violation_idx = i; - break; - } - } - - assert(violation_idx != -1); - - level_index base_level = - find_reconstruction_target(violation_idx, scratch_state); - if (base_level == -1) { - base_level = grow(scratch_state); - } - - for (level_index i = base_level; i > 0; i--) { - /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. - */ - size_t reccnt = m_levels[i - 1]->get_record_count(); - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt, scratch_state)) { - reccnt += m_levels[i]->get_record_count(); + inline void perform_reconstruction(ReconstructionTask task) { + /* perform the reconstruction itself */ + std::vector shards; + for (ShardID shid : task.sources) { + assert(shid.level_idx < m_levels.size()); + assert(shid.shard_idx >= -1); + + /* if unspecified, push all shards into the vector */ + if (shid.shard_idx == all_shards_idx) { + for (size_t i = 0; i < m_levels[shid.level_idx].get_shard_count(); + i++) { + if (m_levels[shid.level_idx]->get_shard(i)) { + shards.push_back(m_levels[shid.level_idx]->get_shard(i)); + } } + } else { + shards.push_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx)); } - tasks.add_reconstruction(i - i, i, reccnt); } - return tasks; - } + auto new_shard = Shard(shards); - /* - * - */ - ReconstructionVector - get_reconstruction_tasks(size_t buffer_reccnt, - state_vector scratch_state = {}) { /* - * If no scratch state vector is provided, use a copy of the - * current one. The only time an empty vector could be used as - * *real* input to this function is when the current state is also - * empty, so this should would even in that case. + * Remove all of the shards processed by the operation */ - if (scratch_state.size() == 0) { - scratch_state = m_current_state; - } - - ReconstructionVector reconstructions; - size_t LOOKAHEAD = 1; - for (size_t i = 0; i < LOOKAHEAD; i++) { - /* - * If L0 cannot support a direct buffer flush, figure out what - * work must be done to free up space first. Otherwise, the - * reconstruction vector will be initially empty. - */ - if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) { - auto local_recon = - get_reconstruction_tasks_from_level(0, scratch_state); - - /* - * for the first iteration, we need to do all of the - * reconstructions, so use these to initially the returned - * reconstruction list - */ - if (i == 0) { - reconstructions = local_recon; - /* - * Quick sanity test of idea: if the next reconstruction - * would be larger than this one, steal the largest - * task from it and run it now instead. - */ - } else if (local_recon.get_total_reccnt() > - reconstructions.get_total_reccnt()) { - auto t = local_recon.remove_reconstruction(0); - reconstructions.add_reconstruction(t); - } - } - - /* simulate the buffer flush in the scratch state */ - scratch_state[0].reccnt += buffer_reccnt; - if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) { - scratch_state[0].shardcnt += 1; + for (ShardID shid : task.sources) { + if (shid.shard_idx == all_shards_idx) { + m_levels[shid.level_idx]->truncate(); + } else { + m_levels[shid.level_idx]->delete_shard(shid.shard_idx); } } - return reconstructions; - } - - /* - * - */ - ReconstructionVector - get_reconstruction_tasks_from_level(level_index source_level, - state_vector &scratch_state) { - ReconstructionVector reconstructions; - /* - * Find the first level capable of sustaining a reconstruction from - * the level above it. If no such level exists, add a new one at - * the bottom of the structure. + * Append the new shard to the target level */ - level_index base_level = - find_reconstruction_target(source_level, scratch_state); - if (base_level == -1) { - base_level = grow(scratch_state); - } - - if constexpr (L == LayoutPolicy::BSM) { - if (base_level == 0) { - return reconstructions; - } - - ReconstructionTask task; - task.target = base_level; - - size_t base_reccnt = 0; - for (level_index i = base_level; i > source_level; i--) { - auto recon_reccnt = scratch_state[i - 1].reccnt; - base_reccnt += recon_reccnt; - scratch_state[i - 1].reccnt = 0; - scratch_state[i - 1].shardcnt = 0; - task.add_source(i - 1, recon_reccnt); - } - - reconstructions.add_reconstruction(task); - scratch_state[base_level].reccnt = base_reccnt; - scratch_state[base_level].shardcnt = 1; - - return reconstructions; + if (task.target < m_levels.size()) { + m_levels[task.target]->append_shard(new_shard); + } else { + m_levels.push_back(); } + } + inline void perform_flush(ReconstructionTask task, BuffView buffer) { /* - * Determine the full set of reconstructions necessary to open up - * space in the source level. + * FIXME: this might be faster with a custom interface for merging + * the buffer and a vector of shards, but that would also complicate + * the shard interface a lot, so we'll leave it like this for now. It + * does mean that policies that merge the buffer into L0 double-process + * the buffer itself. Given that we're unlikely to actually use policies + * like that, we'll leave this as low priority. */ - for (level_index i = base_level; i > source_level; i--) { - size_t recon_reccnt = scratch_state[i - 1].reccnt; - size_t base_reccnt = recon_reccnt; - - /* - * If using Leveling, the total reconstruction size will be the - * records in *both* base and target, because they will need to - * be merged (assuming that target isn't empty). - */ - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, base_reccnt, scratch_state)) { - recon_reccnt += scratch_state[i].reccnt; - } - } - reconstructions.add_reconstruction(i - 1, i, recon_reccnt); - - /* - * The base level will be emptied and its records moved to - * the target. - */ - scratch_state[i - 1].reccnt = 0; - scratch_state[i - 1].shardcnt = 0; - - /* - * The target level will have the records from the base level - * added to it, and potentially gain a shard if the LayoutPolicy - * is tiering or the level currently lacks any shards at all. - */ - scratch_state[i].reccnt += base_reccnt; - if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) { - scratch_state[i].shardcnt += 1; - } - } - - return reconstructions; - } - - inline void reconstruction(ReconstructionTask task) { - static_assert(L == LayoutPolicy::BSM); - std::vector *> levels( - task.sources.size()); - for (size_t i = 0; i < task.sources.size(); i++) { - levels[i] = m_levels[task.sources[i]].get(); - } - - auto new_level = InternalLevel::reconstruction( - levels, task.target); - if (task.target >= (level_index) m_levels.size()) { - m_current_state.push_back({new_level->get_record_count(), - calc_level_record_capacity(task.target), 1, - 1}); - m_levels.emplace_back(new_level); + ShardType *buffer_shard = new ShardType(buffer); + if (task.type == ReconstructionType::Append) { + m_levels[0]->append(std::shared_ptr(buffer_shard)); } else { - m_current_state[task.target] = {new_level->get_record_count(), - calc_level_record_capacity(task.target), - 1, 1}; - m_levels[task.target] = new_level; - } - - /* remove all of the levels that have been flattened */ - for (size_t i = 0; i < task.sources.size(); i++) { - m_levels[task.sources[i]] = - std::shared_ptr>( - new InternalLevel(task.sources[i], 1)); - m_current_state[task.sources[i]] = { - 0, calc_level_record_capacity(task.target), 0, 1}; - } - - return; - } - - /* - * Combine incoming_level with base_level and reconstruct the shard, - * placing it in base_level. The two levels should be sequential--i.e. no - * levels are skipped in the reconstruction process--otherwise the - * tombstone ordering invariant may be violated. - */ - inline void reconstruction(level_index base_level, - level_index incoming_level) { - size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - - if (base_level >= (level_index) m_levels.size()) { - m_levels.emplace_back( - std::shared_ptr>( - new InternalLevel(base_level, - shard_capacity))); - m_current_state.push_back( - {0, calc_level_record_capacity(base_level), 0, shard_capacity}); - } + std::vector shards; + for (size_t i = 0; i < m_levels[0].size(); i++) { + if (m_levels[0]->get_shard(i)) { + shards.push_back(m_levels[0]->get_shard(i)); + } - if constexpr (L == LayoutPolicy::LEVELING) { - /* if the base level has a shard, merge the base and incoming together to - * make a new one */ - if (m_levels[base_level]->get_shard_count() > 0) { - m_levels[base_level] = - InternalLevel::reconstruction( - m_levels[base_level].get(), m_levels[incoming_level].get()); - /* otherwise, we can just move the incoming to the base */ - } else { - m_levels[base_level] = m_levels[incoming_level]; + shards.push_back(buffer_shard); + ShardType *new_shard = new ShardType(shards); + m_levels[0]->truncate(); + m_levels[0]->append(std::shared_ptr(new_shard)); } - - } else { - m_levels[base_level]->append_level(m_levels[incoming_level].get()); - m_levels[base_level]->finalize(); } - - /* place a new, empty level where the incoming level used to be */ - m_levels[incoming_level] = - std::shared_ptr>( - new InternalLevel( - incoming_level, - (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); - - /* - * Update the state vector to match the *real* state following - * the reconstruction - */ - m_current_state[base_level] = {m_levels[base_level]->get_record_count(), - calc_level_record_capacity(base_level), - m_levels[base_level]->get_shard_count(), - shard_capacity}; - m_current_state[incoming_level] = { - 0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; } bool take_reference() { @@ -518,11 +226,11 @@ public: return true; } - size_t get_reference_count() { return m_refcnt.load(); } + size_t get_reference_count() const { return m_refcnt.load(); } std::vector get_local_queries(std::vector> &shards, - typename QueryType::Parameters *parms) { + typename QueryType::Parameters *parms) const { std::vector queries; @@ -533,150 +241,11 @@ public: return queries; } -private: - size_t m_scale_factor; - double m_max_delete_prop; - size_t m_buffer_size; + LevelVector const &get_level_vector() const { return m_levels; } +private: std::atomic m_refcnt; - - std::vector>> m_levels; - - /* - * A pair of for each level in the - * structure. Record counts may be slightly inaccurate due to - * deletes. - */ - state_vector m_current_state; - - /* - * Add a new level to the scratch state and return its index. - * - * IMPORTANT: This does _not_ add a level to the extension structure - * anymore. This is handled by the appropriate reconstruction and flush - * methods as needed. This function is for use in "simulated" - * reconstructions. - */ - inline level_index grow(state_vector &scratch_state) { - level_index new_idx = m_levels.size(); - size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - - scratch_state.push_back( - {0, calc_level_record_capacity(new_idx), 0, new_shard_cap}); - return new_idx; - } - - /* - * Find the first level below the level indicated by idx that - * is capable of sustaining a reconstruction and return its - * level index. If no such level exists, returns -1. Also - * returns -1 if idx==0, and no such level exists, to simplify - * the logic of the first buffer flush. - */ - inline level_index find_reconstruction_target(level_index idx, - state_vector &state) { - - /* - * this handles the very first buffer flush, when the state vector - * is empty. - */ - if (idx == 0 && state.size() == 0) - return -1; - - size_t incoming_rec_cnt = state[idx].reccnt; - for (level_index i = idx + 1; i < (level_index) state.size(); i++) { - if (can_reconstruct_with(i, incoming_rec_cnt, state)) { - return i; - } - - incoming_rec_cnt = state[idx].reccnt; - } - - return -1; - } - - inline void flush_buffer_into_l0(BuffView buffer) { - size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - - if (m_levels.size() == 0) { - m_levels.emplace_back( - std::shared_ptr>( - new InternalLevel(0, shard_capacity))); - - m_current_state.push_back( - {0, calc_level_record_capacity(0), 0, shard_capacity}); - } - - if constexpr (L == LayoutPolicy::LEVELING) { - // FIXME: Kludgey implementation due to interface constraints. - auto old_level = m_levels[0].get(); - auto temp_level = new InternalLevel(0, 1); - temp_level->append_buffer(std::move(buffer)); - - if (old_level->get_shard_count() > 0) { - m_levels[0] = InternalLevel::reconstruction( - old_level, temp_level); - delete temp_level; - } else { - m_levels[0] = - std::shared_ptr>(temp_level); - } - } else { - m_levels[0]->append_buffer(std::move(buffer)); - } - - /* update the state vector */ - m_current_state[0].reccnt = m_levels[0]->get_record_count(); - m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); - } - - /* - * Mark a given memory level as no-longer in use by the tree. For now this - * will just free the level. In future, this will be more complex as the - * level may not be able to immediately be deleted, depending upon who - * else is using it. - */ - inline void - mark_as_unused(std::shared_ptr> level) { - level.reset(); - } - - /* - * Assume that level "0" should be larger than the buffer. The buffer - * itself is index -1, which should return simply the buffer capacity. - */ - inline size_t calc_level_record_capacity(level_index idx) { - return m_buffer_size * pow(m_scale_factor, idx + 1); - } - - /* - * Returns the number of records present on a specified level. - */ - inline size_t get_level_record_count(level_index idx) { - return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; - } - - /* - * Determines if a level can sustain a reconstruction with incoming_rec_cnt - * additional records without exceeding its capacity. - */ - inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, - state_vector &state) { - if (idx >= (level_index) state.size()) { - return false; - } - - if constexpr (L == LayoutPolicy::LEVELING) { - return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; - } else if constexpr (L == LayoutPolicy::BSM) { - return state[idx].reccnt == 0; - } else { - return state[idx].shardcnt < state[idx].shardcap; - } - - /* unreachable */ - assert(true); - } + LevelVector m_levels; }; } // namespace de diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index a4cf94d..8cfcd49 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -19,14 +19,10 @@ #include #include "framework/interface/Query.h" -#include "framework/interface/Record.h" #include "framework/interface/Shard.h" -#include "framework/structure/BufferView.h" #include "util/types.h" namespace de { -template QueryType> -class InternalLevel; template QueryType> class InternalLevel { @@ -34,110 +30,9 @@ class InternalLevel { typedef BufferView BuffView; public: - InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no), m_shard_cnt(0), m_shards(shard_cap, nullptr), - m_pending_shard(nullptr) {} - - ~InternalLevel() { delete m_pending_shard; } - - /* - * Create a new shard combining the records from base_level and new_level, - * and return a shared_ptr to a new level containing this shard. This is used - * for reconstructions under the leveling layout policy. - * - * No changes are made to the levels provided as arguments. - */ - static std::shared_ptr - reconstruction(InternalLevel *base_level, InternalLevel *new_level) { - assert(base_level->m_level_no > new_level->m_level_no || - (base_level->m_level_no == 0 && new_level->m_level_no == 0)); - auto res = new InternalLevel(base_level->m_level_no, 1); - res->m_shard_cnt = 1; - std::vector shards = {base_level->m_shards[0].get(), - new_level->m_shards[0].get()}; - - res->m_shards[0] = std::make_shared(shards); - return std::shared_ptr(res); - } - - static std::shared_ptr - reconstruction(std::vector levels, size_t level_idx) { - std::vector shards; - for (auto level : levels) { - for (auto shard : level->m_shards) { - if (shard) - shards.emplace_back(shard.get()); - } - } - - auto res = new InternalLevel(level_idx, 1); - res->m_shard_cnt = 1; - res->m_shards[0] = std::make_shared(shards); + InternalLevel(ssize_t level_no) : m_level_no(level_no) {} - return std::shared_ptr(res); - } - - /* - * Create a new shard combining the records from all of - * the shards in level, and append this new shard into - * this level. This is used for reconstructions under - * the tiering layout policy. - * - * No changes are made to the level provided as an argument. - */ - void append_level(InternalLevel *level) { - // FIXME: that this is happening probably means that - // something is going terribly wrong earlier in the - // reconstruction logic. - if (level->get_shard_count() == 0) { - return; - } - - std::vector shards; - for (auto shard : level->m_shards) { - if (shard) - shards.emplace_back(shard.get()); - } - - if (m_shard_cnt == m_shards.size()) { - m_pending_shard = new ShardType(shards); - return; - } - - auto tmp = new ShardType(shards); - m_shards[m_shard_cnt] = std::shared_ptr(tmp); - - ++m_shard_cnt; - } - - /* - * Create a new shard using the records in the - * provided buffer, and append this new shard - * into this level. This is used for buffer - * flushes under the tiering layout policy. - */ - void append_buffer(BuffView buffer) { - if (m_shard_cnt == m_shards.size()) { - assert(m_pending_shard == nullptr); - m_pending_shard = new ShardType(std::move(buffer)); - return; - } - - m_shards[m_shard_cnt] = std::make_shared(std::move(buffer)); - ++m_shard_cnt; - } - - void finalize() { - if (m_pending_shard) { - for (size_t i = 0; i < m_shards.size(); i++) { - m_shards[i] = nullptr; - } - - m_shards[0] = std::shared_ptr(m_pending_shard); - m_pending_shard = nullptr; - m_shard_cnt = 1; - } - } + ~InternalLevel() = default; /* * Create a new shard containing the combined records @@ -146,7 +41,7 @@ public: * No changes are made to this level. */ ShardType *get_combined_shard() { - if (m_shard_cnt == 0) { + if (m_shards.size() == 0) { return nullptr; } @@ -163,7 +58,7 @@ public: std::vector> &shards, std::vector &local_queries, typename QueryType::Parameters *query_parms) { - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { auto local_query = QueryType::local_preproc(m_shards[i].get(), query_parms); @@ -174,10 +69,10 @@ public: } bool check_tombstone(size_t shard_stop, const RecordType &rec) { - if (m_shard_cnt == 0) + if (m_shards.size() == 0) return false; - for (int i = m_shard_cnt - 1; i >= (ssize_t)shard_stop; i--) { + for (int i = m_shards.size() - 1; i >= (ssize_t)shard_stop; i--) { if (m_shards[i]) { auto res = m_shards[i]->point_lookup(rec, true); if (res && res->is_tombstone()) { @@ -189,7 +84,7 @@ public: } bool delete_record(const RecordType &rec) { - if (m_shard_cnt == 0) + if (m_shards.size() == 0) return false; for (size_t i = 0; i < m_shards.size(); ++i) { @@ -206,18 +101,18 @@ public: } ShardType *get_shard(size_t idx) { - if (idx >= m_shard_cnt) { + if (idx >= m_shards.size()) { return nullptr; } return m_shards[idx].get(); } - size_t get_shard_count() { return m_shard_cnt; } + size_t get_shard_count() { return m_shards.size(); } size_t get_record_count() { size_t cnt = 0; - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { cnt += m_shards[i]->get_record_count(); } @@ -228,7 +123,7 @@ public: size_t get_tombstone_count() { size_t res = 0; - for (size_t i = 0; i < m_shard_cnt; ++i) { + for (size_t i = 0; i < m_shards.size(); ++i) { if (m_shards[i]) { res += m_shards[i]->get_tombstone_count(); } @@ -238,7 +133,7 @@ public: size_t get_aux_memory_usage() { size_t cnt = 0; - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { cnt += m_shards[i]->get_aux_memory_usage(); } @@ -249,7 +144,7 @@ public: size_t get_memory_usage() { size_t cnt = 0; - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { cnt += m_shards[i]->get_memory_usage(); } @@ -261,7 +156,7 @@ public: double get_tombstone_prop() { size_t tscnt = 0; size_t reccnt = 0; - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { tscnt += m_shards[i]->get_tombstone_count(); reccnt += m_shards[i]->get_record_count(); @@ -272,24 +167,27 @@ public: } std::shared_ptr clone() { - auto new_level = - std::make_shared(m_level_no, m_shards.size()); - for (size_t i = 0; i < m_shard_cnt; i++) { + auto new_level = std::make_shared(m_level_no); + for (size_t i = 0; i < m_shards.size(); i++) { new_level->m_shards[i] = m_shards[i]; } - new_level->m_shard_cnt = m_shard_cnt; return new_level; } -private: - ssize_t m_level_no; + void truncate() { m_shards.erase(m_shards.begin(), m_shards.end()); } - size_t m_shard_cnt; - size_t m_shard_size_cap; + void delete_shard(shard_index shard) { + m_shards.erase(m_shards.begin() + shard); + } + + bool append(std::shared_ptr shard) { + m_shards.emplace_back(shard); + } +private: + ssize_t m_level_no; std::vector> m_shards; - ShardType *m_pending_shard; }; } // namespace de -- cgit v1.2.3