diff options
Diffstat (limited to 'include/framework/structure/ExtensionStructure.h')
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 1080 |
1 files changed, 559 insertions, 521 deletions
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index b83674b..2728246 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -1,8 +1,8 @@ /* * include/framework/structure/ExtensionStructure.h * - * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> - * Dong Xie <dongx@psu.edu> + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> * * Distributed under the Modified BSD License. * @@ -22,622 +22,660 @@ namespace de { -template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L=LayoutPolicy::TEIRING> +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType, + LayoutPolicy L = LayoutPolicy::TEIRING> class ExtensionStructure { - typedef S Shard; - typedef BufferView<R> BuffView; + typedef typename ShardType::RECORD RecordType; + typedef BufferView<RecordType> BuffView; - typedef struct { - size_t reccnt; - size_t reccap; + typedef struct { + size_t reccnt; + size_t reccap; - size_t shardcnt; - size_t shardcap; - } level_state; + size_t shardcnt; + size_t shardcap; + } level_state; - typedef std::vector<level_state> state_vector; + typedef std::vector<level_state> state_vector; public: - ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) - : m_scale_factor(scale_factor) - , m_max_delete_prop(max_delete_prop) - , m_buffer_size(buffer_size) - {} - - ~ExtensionStructure() = default; - - /* - * Create a shallow copy of this extension structure. The copy will share - * references to the same levels/shards as the original, but will have its - * own lists. As all of the shards are immutable (with the exception of - * deletes), the copy can be restructured with reconstructions and flushes - * without affecting the original. The copied structure will be returned - * with a reference count of 0; generally you will want to immediately call - * take_reference() on it. - * - * NOTE: When using tagged deletes, a delete of a record in the original - * structure will affect the copy, so long as the copy retains a reference - * to the same shard as the original. This could cause synchronization - * problems under tagging with concurrency. Any deletes in this context will - * need to be forwarded to the appropriate structures manually. - */ - ExtensionStructure<R, S, Q, L> *copy() { - auto new_struct = new ExtensionStructure<R, S, Q, L>(m_buffer_size, m_scale_factor, - m_max_delete_prop); - for (size_t i=0; i<m_levels.size(); i++) { - new_struct->m_levels.push_back(m_levels[i]->clone()); - } - - new_struct->m_refcnt = 0; - new_struct->m_current_state = m_current_state; + ExtensionStructure(size_t buffer_size, size_t scale_factor, + double max_delete_prop) + : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop), + m_buffer_size(buffer_size) {} + + ~ExtensionStructure() = default; + + /* + * Create a shallow copy of this extension structure. The copy will share + * references to the same levels/shards as the original, but will have its + * own lists. As all of the shards are immutable (with the exception of + * deletes), the copy can be restructured with reconstructions and flushes + * without affecting the original. The copied structure will be returned + * with a reference count of 0; generally you will want to immediately call + * take_reference() on it. + * + * NOTE: When using tagged deletes, a delete of a record in the original + * structure will affect the copy, so long as the copy retains a reference + * to the same shard as the original. This could cause synchronization + * problems under tagging with concurrency. Any deletes in this context will + * need to be forwarded to the appropriate structures manually. + */ + ExtensionStructure<ShardType, QueryType, L> *copy() { + auto new_struct = new ExtensionStructure<ShardType, QueryType, L>( + m_buffer_size, m_scale_factor, m_max_delete_prop); + for (size_t i = 0; i < m_levels.size(); i++) { + new_struct->m_levels.push_back(m_levels[i]->clone()); + } - return new_struct; + new_struct->m_refcnt = 0; + new_struct->m_current_state = m_current_state; + + return new_struct; + } + + /* + * Search for a record matching the argument and mark it deleted by + * setting the delete bit in its wrapped header. Returns 1 if a matching + * record was found and deleted, and 0 if a matching record was not found. + * + * This function will stop after finding the first matching record. It is + * assumed that no duplicate records exist. In the case of duplicates, this + * function will still "work", but in the sense of "delete first match". + */ + int tagged_delete(const RecordType &rec) { + for (auto level : m_levels) { + if (level && level->delete_record(rec)) { + return 1; + } } /* - * Search for a record matching the argument and mark it deleted by - * setting the delete bit in its wrapped header. Returns 1 if a matching - * record was found and deleted, and 0 if a matching record was not found. - * - * This function will stop after finding the first matching record. It is - * assumed that no duplicate records exist. In the case of duplicates, this - * function will still "work", but in the sense of "delete first match". + * If the record to be erased wasn't found, return 0. The + * DynamicExtension itself will then search the active + * Buffers. */ - int tagged_delete(const R &rec) { - for (auto level : m_levels) { - if (level && level->delete_record(rec)) { - return 1; - } - } - - /* - * If the record to be erased wasn't found, return 0. The - * DynamicExtension itself will then search the active - * Buffers. - */ - return 0; + return 0; + } + + /* + * Flush a buffer into the extension structure, performing any necessary + * reconstructions to free up room in L0. + * + * FIXME: arguably, this should be a method attached to the buffer that + * takes a structure as input. + */ + inline bool flush_buffer(BuffView buffer) { + state_vector tmp = m_current_state; + + if (tmp.size() == 0) { + grow(tmp); } - /* - * Flush a buffer into the extension structure, performing any necessary - * reconstructions to free up room in L0. - * - * FIXME: arguably, this should be a method attached to the buffer that - * takes a structure as input. - */ - inline bool flush_buffer(BuffView buffer) { - state_vector tmp = m_current_state; + assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); + flush_buffer_into_l0(std::move(buffer)); - if (tmp.size() == 0) { - grow(tmp); - } + return true; + } - assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); - flush_buffer_into_l0(std::move(buffer)); + /* + * Return the total number of records (including tombstones) within all + * of the levels of the structure. + */ + size_t get_record_count() { + size_t cnt = 0; - return true; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) + cnt += m_levels[i]->get_record_count(); } - /* - * Return the total number of records (including tombstones) within all - * of the levels of the structure. - */ - size_t get_record_count() { - size_t cnt = 0; + return cnt; + } - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) cnt += m_levels[i]->get_record_count(); - } + /* + * Return the total number of tombstones contained within all of the + * levels of the structure. + */ + size_t get_tombstone_count() { + size_t cnt = 0; - return cnt; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) + cnt += m_levels[i]->get_tombstone_count(); } - /* - * Return the total number of tombstones contained within all of the - * levels of the structure. - */ - size_t get_tombstone_count() { - size_t cnt = 0; - - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count(); - } - - return cnt; + return cnt; + } + + /* + * Return the number of levels within the structure. Note that not + * all of these levels are necessarily populated. + */ + size_t get_height() { return m_levels.size(); } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing the primary data structure and raw data. + */ + size_t get_memory_usage() { + size_t cnt = 0; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) + cnt += m_levels[i]->get_memory_usage(); } - /* - * Return the number of levels within the structure. Note that not - * all of these levels are necessarily populated. - */ - size_t get_height() { - return m_levels.size(); + return cnt; + } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing auxiliary data structures. This total does not + * include memory used for the main data structure, or raw data. + */ + size_t get_aux_memory_usage() { + size_t cnt = 0; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + cnt += m_levels[i]->get_aux_memory_usage(); + } } - /* - * Return the amount of memory (in bytes) used by the shards within the - * structure for storing the primary data structure and raw data. - */ - size_t get_memory_usage() { - size_t cnt = 0; - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) cnt += m_levels[i]->get_memory_usage(); + return cnt; + } + + /* + * Validate that no level in the structure exceeds its maximum tombstone + * capacity. This is used to trigger preemptive compactions at the end of + * the reconstruction process. + */ + bool validate_tombstone_proportion() { + long double ts_prop; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + ts_prop = (long double)m_levels[i]->get_tombstone_count() / + (long double)calc_level_record_capacity(i); + if (ts_prop > (long double)m_max_delete_prop) { + return false; } - - return cnt; + } } - /* - * Return the amount of memory (in bytes) used by the shards within the - * structure for storing auxiliary data structures. This total does not - * include memory used for the main data structure, or raw data. + return true; + } + + bool validate_tombstone_proportion(level_index level) { + long double ts_prop = (long double)m_levels[level]->get_tombstone_count() / + (long double)calc_level_record_capacity(level); + return ts_prop <= (long double)m_max_delete_prop; + } + + /* + * Return a reference to the underlying vector of levels within the + * structure. + */ + std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> & + get_levels() { + return m_levels; + } + + /* + * NOTE: This cannot be simulated, because tombstone cancellation is not + * cheaply predictable. It is possible that the worst case number could + * be used instead, to allow for prediction, but compaction isn't a + * major concern outside of sampling; at least for now. So I'm not + * going to focus too much time on it at the moment. + */ + ReconstructionVector get_compaction_tasks() { + ReconstructionVector tasks; + state_vector scratch_state = m_current_state; + + /* if the tombstone/delete invariant is satisfied, no need for compactions */ - size_t get_aux_memory_usage() { - size_t cnt = 0; - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) { - cnt += m_levels[i]->get_aux_memory_usage(); - } - } - - return cnt; + if (validate_tombstone_proportion()) { + return tasks; } - /* - * Validate that no level in the structure exceeds its maximum tombstone - * capacity. This is used to trigger preemptive compactions at the end of - * the reconstruction process. - */ - bool validate_tombstone_proportion() { - long double ts_prop; - for (size_t i = 0; i < m_levels.size(); i++) { - if (m_levels[i]) { - ts_prop = (long double)m_levels[i]->get_tombstone_count() / - (long double)calc_level_record_capacity(i); - if (ts_prop > (long double)m_max_delete_prop) { - return false; - } - } + /* locate the first level to violate the invariant */ + level_index violation_idx = -1; + for (level_index i = 0; i < m_levels.size(); i++) { + if (!validate_tombstone_proportion(i)) { + violation_idx = i; + break; } - - return true; } - bool validate_tombstone_proportion(level_index level) { - long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level); - return ts_prop <= (long double) m_max_delete_prop; - } + assert(violation_idx != -1); - /* - * Return a reference to the underlying vector of levels within the - * structure. - */ - std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> &get_levels() { - return m_levels; + level_index base_level = + find_reconstruction_target(violation_idx, scratch_state); + if (base_level == -1) { + base_level = grow(scratch_state); } - /* - * NOTE: This cannot be simulated, because tombstone cancellation is not - * cheaply predictable. It is possible that the worst case number could - * be used instead, to allow for prediction, but compaction isn't a - * major concern outside of sampling; at least for now. So I'm not - * going to focus too much time on it at the moment. - */ - ReconstructionVector get_compaction_tasks() { - ReconstructionVector tasks; - state_vector scratch_state = m_current_state; - - /* if the tombstone/delete invariant is satisfied, no need for compactions */ - if (validate_tombstone_proportion()) { - return tasks; - } - - /* locate the first level to violate the invariant */ - level_index violation_idx = -1; - for (level_index i=0; i<m_levels.size(); i++) { - if (!validate_tombstone_proportion(i)) { - violation_idx = i; - break; - } - } - - assert(violation_idx != -1); - - level_index base_level = find_reconstruction_target(violation_idx, scratch_state); - if (base_level == -1) { - base_level = grow(scratch_state); - } - - for (level_index i=base_level; i>0; i--) { - /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. - */ - size_t reccnt = m_levels[i - 1]->get_record_count(); - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt, scratch_state)) { - reccnt += m_levels[i]->get_record_count(); - } - } - tasks.add_reconstruction(i-i, i, reccnt); + for (level_index i = base_level; i > 0; i--) { + /* + * The amount of storage required for the reconstruction accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i - 1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_reconstruct_with(i, reccnt, scratch_state)) { + reccnt += m_levels[i]->get_record_count(); } - - return tasks; + } + tasks.add_reconstruction(i - i, i, reccnt); } + return tasks; + } + + /* + * + */ + ReconstructionVector + get_reconstruction_tasks(size_t buffer_reccnt, + state_vector scratch_state = {}) { /* - * + * If no scratch state vector is provided, use a copy of the + * current one. The only time an empty vector could be used as + * *real* input to this function is when the current state is also + * empty, so this should would even in that case. */ - ReconstructionVector get_reconstruction_tasks(size_t buffer_reccnt, - state_vector scratch_state={}) { - /* - * If no scratch state vector is provided, use a copy of the - * current one. The only time an empty vector could be used as - * *real* input to this function is when the current state is also - * empty, so this should would even in that case. - */ - if (scratch_state.size() == 0) { - scratch_state = m_current_state; - } - - ReconstructionVector reconstructions; - size_t LOOKAHEAD = 1; - for (size_t i=0; i<LOOKAHEAD; i++) { - /* - * If L0 cannot support a direct buffer flush, figure out what - * work must be done to free up space first. Otherwise, the - * reconstruction vector will be initially empty. - */ - if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) { - auto local_recon = get_reconstruction_tasks_from_level(0, scratch_state); - - /* - * for the first iteration, we need to do all of the - * reconstructions, so use these to initially the returned - * reconstruction list - */ - if (i == 0) { - reconstructions = local_recon; - /* - * Quick sanity test of idea: if the next reconstruction - * would be larger than this one, steal the largest - * task from it and run it now instead. - */ - } else if (local_recon.get_total_reccnt() > reconstructions.get_total_reccnt()) { - auto t = local_recon.remove_reconstruction(0); - reconstructions.add_reconstruction(t); - } - } - - /* simulate the buffer flush in the scratch state */ - scratch_state[0].reccnt += buffer_reccnt; - if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) { - scratch_state[0].shardcnt += 1; - } - - } - - return std::move(reconstructions); + if (scratch_state.size() == 0) { + scratch_state = m_current_state; } - - /* - * - */ - ReconstructionVector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) { - ReconstructionVector reconstructions; + ReconstructionVector reconstructions; + size_t LOOKAHEAD = 1; + for (size_t i = 0; i < LOOKAHEAD; i++) { + /* + * If L0 cannot support a direct buffer flush, figure out what + * work must be done to free up space first. Otherwise, the + * reconstruction vector will be initially empty. + */ + if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) { + auto local_recon = + get_reconstruction_tasks_from_level(0, scratch_state); /* - * Find the first level capable of sustaining a reconstruction from - * the level above it. If no such level exists, add a new one at - * the bottom of the structure. + * for the first iteration, we need to do all of the + * reconstructions, so use these to initially the returned + * reconstruction list */ - level_index base_level = find_reconstruction_target(source_level, scratch_state); - if (base_level == -1) { - base_level = grow(scratch_state); + if (i == 0) { + reconstructions = local_recon; + /* + * Quick sanity test of idea: if the next reconstruction + * would be larger than this one, steal the largest + * task from it and run it now instead. + */ + } else if (local_recon.get_total_reccnt() > + reconstructions.get_total_reccnt()) { + auto t = local_recon.remove_reconstruction(0); + reconstructions.add_reconstruction(t); } + } - if constexpr (L == LayoutPolicy::BSM) { - if (base_level == 0) { - return std::move(reconstructions); - } - - ReconstructionTask task; - task.target = base_level; - - size_t base_reccnt = 0; - for (level_index i=base_level; i>source_level; i--) { - auto recon_reccnt = scratch_state[i-1].reccnt; - base_reccnt += recon_reccnt; - scratch_state[i-1].reccnt = 0; - scratch_state[i-1].shardcnt = 0; - task.add_source(i-1, recon_reccnt); - } + /* simulate the buffer flush in the scratch state */ + scratch_state[0].reccnt += buffer_reccnt; + if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) { + scratch_state[0].shardcnt += 1; + } + } - reconstructions.add_reconstruction(task); - scratch_state[base_level].reccnt = base_reccnt; - scratch_state[base_level].shardcnt = 1; + return reconstructions; + } - return std::move(reconstructions); - } + /* + * + */ + ReconstructionVector + get_reconstruction_tasks_from_level(level_index source_level, + state_vector &scratch_state) { + ReconstructionVector reconstructions; - /* - * Determine the full set of reconstructions necessary to open up - * space in the source level. - */ - for (level_index i=base_level; i>source_level; i--) { - size_t recon_reccnt = scratch_state[i-1].reccnt; - size_t base_reccnt = recon_reccnt; - - /* - * If using Leveling, the total reconstruction size will be the - * records in *both* base and target, because they will need to - * be merged (assuming that target isn't empty). - */ - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, base_reccnt, scratch_state)) { - recon_reccnt += scratch_state[i].reccnt; - } - } - reconstructions.add_reconstruction(i-1, i, recon_reccnt); - - /* - * The base level will be emptied and its records moved to - * the target. - */ - scratch_state[i-1].reccnt = 0; - scratch_state[i-1].shardcnt = 0; - - /* - * The target level will have the records from the base level - * added to it, and potentially gain a shard if the LayoutPolicy - * is tiering or the level currently lacks any shards at all. - */ - scratch_state[i].reccnt += base_reccnt; - if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) { - scratch_state[i].shardcnt += 1; - } - } - - return std::move(reconstructions); + /* + * Find the first level capable of sustaining a reconstruction from + * the level above it. If no such level exists, add a new one at + * the bottom of the structure. + */ + level_index base_level = + find_reconstruction_target(source_level, scratch_state); + if (base_level == -1) { + base_level = grow(scratch_state); } - inline void reconstruction(ReconstructionTask task) { - static_assert(L == LayoutPolicy::BSM); - std::vector<InternalLevel<R, Shard, Q>*> levels(task.sources.size()); - for (size_t i=0; i<task.sources.size(); i++) { - levels[i] = m_levels[task.sources[i]].get(); - } + if constexpr (L == LayoutPolicy::BSM) { + if (base_level == 0) { + return reconstructions; + } - auto new_level = InternalLevel<R, Shard, Q>::reconstruction(levels, task.target); - if (task.target >= m_levels.size()) { - m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target), - 1, 1}); - m_levels.emplace_back(new_level); - } else { - m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target), - 1, 1}; - m_levels[task.target] = new_level; - } + ReconstructionTask task; + task.target = base_level; - /* remove all of the levels that have been flattened */ - for (size_t i=0; i<task.sources.size(); i++) { - m_levels[task.sources[i]] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(task.sources[i], 1)); - m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1}; - } + size_t base_reccnt = 0; + for (level_index i = base_level; i > source_level; i--) { + auto recon_reccnt = scratch_state[i - 1].reccnt; + base_reccnt += recon_reccnt; + scratch_state[i - 1].reccnt = 0; + scratch_state[i - 1].shardcnt = 0; + task.add_source(i - 1, recon_reccnt); + } + + reconstructions.add_reconstruction(task); + scratch_state[base_level].reccnt = base_reccnt; + scratch_state[base_level].shardcnt = 1; - return; + return reconstructions; } /* - * Combine incoming_level with base_level and reconstruct the shard, - * placing it in base_level. The two levels should be sequential--i.e. no - * levels are skipped in the reconstruction process--otherwise the - * tombstone ordering invariant may be violated. + * Determine the full set of reconstructions necessary to open up + * space in the source level. */ - inline void reconstruction(level_index base_level, level_index incoming_level) { - size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - - if (base_level >= m_levels.size()) { - m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(base_level, shard_capacity))); - m_current_state.push_back({0, calc_level_record_capacity(base_level), - 0, shard_capacity}); - } - - if constexpr (L == LayoutPolicy::LEVELING) { - /* if the base level has a shard, merge the base and incoming together to make a new one */ - if (m_levels[base_level]->get_shard_count() > 0) { - m_levels[base_level] = InternalLevel<R, Shard, Q>::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get()); - /* otherwise, we can just move the incoming to the base */ - } else { - m_levels[base_level] = m_levels[incoming_level]; - } - - } else { - m_levels[base_level]->append_level(m_levels[incoming_level].get()); - m_levels[base_level]->finalize(); + for (level_index i = base_level; i > source_level; i--) { + size_t recon_reccnt = scratch_state[i - 1].reccnt; + size_t base_reccnt = recon_reccnt; + + /* + * If using Leveling, the total reconstruction size will be the + * records in *both* base and target, because they will need to + * be merged (assuming that target isn't empty). + */ + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_reconstruct_with(i, base_reccnt, scratch_state)) { + recon_reccnt += scratch_state[i].reccnt; } + } + reconstructions.add_reconstruction(i - 1, i, recon_reccnt); + + /* + * The base level will be emptied and its records moved to + * the target. + */ + scratch_state[i - 1].reccnt = 0; + scratch_state[i - 1].shardcnt = 0; + + /* + * The target level will have the records from the base level + * added to it, and potentially gain a shard if the LayoutPolicy + * is tiering or the level currently lacks any shards at all. + */ + scratch_state[i].reccnt += base_reccnt; + if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) { + scratch_state[i].shardcnt += 1; + } + } - /* place a new, empty level where the incoming level used to be */ - m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + return reconstructions; + } - /* - * Update the state vector to match the *real* state following - * the reconstruction - */ - m_current_state[base_level] = {m_levels[base_level]->get_record_count(), - calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity}; - m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; + inline void reconstruction(ReconstructionTask task) { + static_assert(L == LayoutPolicy::BSM); + std::vector<InternalLevel<ShardType, QueryType> *> levels( + task.sources.size()); + for (size_t i = 0; i < task.sources.size(); i++) { + levels[i] = m_levels[task.sources[i]].get(); } - bool take_reference() { - m_refcnt.fetch_add(1); - return true; + auto new_level = InternalLevel<ShardType, QueryType>::reconstruction( + levels, task.target); + if (task.target >= m_levels.size()) { + m_current_state.push_back({new_level->get_record_count(), + calc_level_record_capacity(task.target), 1, + 1}); + m_levels.emplace_back(new_level); + } else { + m_current_state[task.target] = {new_level->get_record_count(), + calc_level_record_capacity(task.target), + 1, 1}; + m_levels[task.target] = new_level; } - bool release_reference() { - assert(m_refcnt.load() > 0); - m_refcnt.fetch_add(-1); - return true; + /* remove all of the levels that have been flattened */ + for (size_t i = 0; i < task.sources.size(); i++) { + m_levels[task.sources[i]] = + std::shared_ptr<InternalLevel<ShardType, QueryType>>( + new InternalLevel<ShardType, QueryType>(task.sources[i], 1)); + m_current_state[task.sources[i]] = { + 0, calc_level_record_capacity(task.target), 0, 1}; } - size_t get_reference_count() { - return m_refcnt.load(); + return; + } + + /* + * Combine incoming_level with base_level and reconstruct the shard, + * placing it in base_level. The two levels should be sequential--i.e. no + * levels are skipped in the reconstruction process--otherwise the + * tombstone ordering invariant may be violated. + */ + inline void reconstruction(level_index base_level, + level_index incoming_level) { + size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + if (base_level >= m_levels.size()) { + m_levels.emplace_back( + std::shared_ptr<InternalLevel<ShardType, QueryType>>( + new InternalLevel<ShardType, QueryType>(base_level, + shard_capacity))); + m_current_state.push_back( + {0, calc_level_record_capacity(base_level), 0, shard_capacity}); } - std::vector<void *> get_query_states(std::vector<std::pair<ShardID, Shard*>> &shards, void *parms) { - std::vector<void*> states; - - for (auto &level : m_levels) { - level->get_query_states(shards, states, parms); - } + if constexpr (L == LayoutPolicy::LEVELING) { + /* if the base level has a shard, merge the base and incoming together to + * make a new one */ + if (m_levels[base_level]->get_shard_count() > 0) { + m_levels[base_level] = + InternalLevel<ShardType, QueryType>::reconstruction( + m_levels[base_level].get(), m_levels[incoming_level].get()); + /* otherwise, we can just move the incoming to the base */ + } else { + m_levels[base_level] = m_levels[incoming_level]; + } - return states; + } else { + m_levels[base_level]->append_level(m_levels[incoming_level].get()); + m_levels[base_level]->finalize(); } -private: - size_t m_scale_factor; - double m_max_delete_prop; - size_t m_buffer_size; - - std::atomic<size_t> m_refcnt; - - std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels; - - /* - * A pair of <record_count, shard_count> for each level in the - * structure. Record counts may be slightly inaccurate due to - * deletes. - */ - state_vector m_current_state; + /* place a new, empty level where the incoming level used to be */ + m_levels[incoming_level] = + std::shared_ptr<InternalLevel<ShardType, QueryType>>( + new InternalLevel<ShardType, QueryType>( + incoming_level, + (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); /* - * Add a new level to the scratch state and return its index. - * - * IMPORTANT: This does _not_ add a level to the extension structure - * anymore. This is handled by the appropriate reconstruction and flush - * methods as needed. This function is for use in "simulated" - * reconstructions. + * Update the state vector to match the *real* state following + * the reconstruction */ - inline level_index grow(state_vector &scratch_state) { - level_index new_idx = m_levels.size(); - size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - - scratch_state.push_back({0, calc_level_record_capacity(new_idx), - 0, new_shard_cap}); - return new_idx; + m_current_state[base_level] = {m_levels[base_level]->get_record_count(), + calc_level_record_capacity(base_level), + m_levels[base_level]->get_shard_count(), + shard_capacity}; + m_current_state[incoming_level] = { + 0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; + } + + bool take_reference() { + m_refcnt.fetch_add(1); + return true; + } + + bool release_reference() { + assert(m_refcnt.load() > 0); + m_refcnt.fetch_add(-1); + return true; + } + + size_t get_reference_count() { return m_refcnt.load(); } + + std::vector<typename QueryType::LocalQuery *> + get_local_queries(std::vector<std::pair<ShardID, ShardType *>> &shards, + typename QueryType::Parameters *parms) { + + std::vector<typename QueryType::LocalQuery *> queries; + + for (auto &level : m_levels) { + level->get_local_queries(shards, queries, parms); } - /* - * Find the first level below the level indicated by idx that - * is capable of sustaining a reconstruction and return its - * level index. If no such level exists, returns -1. Also - * returns -1 if idx==0, and no such level exists, to simplify - * the logic of the first buffer flush. - */ - inline level_index find_reconstruction_target(level_index idx, state_vector &state) { + return queries; + } - /* - * this handles the very first buffer flush, when the state vector - * is empty. - */ - if (idx == 0 && state.size() == 0) return -1; +private: + size_t m_scale_factor; + double m_max_delete_prop; + size_t m_buffer_size; + + std::atomic<size_t> m_refcnt; + + std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> m_levels; + + /* + * A pair of <record_count, shard_count> for each level in the + * structure. Record counts may be slightly inaccurate due to + * deletes. + */ + state_vector m_current_state; + + /* + * Add a new level to the scratch state and return its index. + * + * IMPORTANT: This does _not_ add a level to the extension structure + * anymore. This is handled by the appropriate reconstruction and flush + * methods as needed. This function is for use in "simulated" + * reconstructions. + */ + inline level_index grow(state_vector &scratch_state) { + level_index new_idx = m_levels.size(); + size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + scratch_state.push_back( + {0, calc_level_record_capacity(new_idx), 0, new_shard_cap}); + return new_idx; + } + + /* + * Find the first level below the level indicated by idx that + * is capable of sustaining a reconstruction and return its + * level index. If no such level exists, returns -1. Also + * returns -1 if idx==0, and no such level exists, to simplify + * the logic of the first buffer flush. + */ + inline level_index find_reconstruction_target(level_index idx, + state_vector &state) { - size_t incoming_rec_cnt = state[idx].reccnt; - for (level_index i=idx+1; i<state.size(); i++) { - if (can_reconstruct_with(i, incoming_rec_cnt, state)) { - return i; - } + /* + * this handles the very first buffer flush, when the state vector + * is empty. + */ + if (idx == 0 && state.size() == 0) + return -1; - incoming_rec_cnt = state[idx].reccnt; - } + size_t incoming_rec_cnt = state[idx].reccnt; + for (level_index i = idx + 1; i < state.size(); i++) { + if (can_reconstruct_with(i, incoming_rec_cnt, state)) { + return i; + } - return -1; + incoming_rec_cnt = state[idx].reccnt; } - inline void flush_buffer_into_l0(BuffView buffer) { - size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - - if (m_levels.size() == 0) { - m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(0, shard_capacity))); + return -1; + } - m_current_state.push_back({0, calc_level_record_capacity(0), - 0, shard_capacity}); - } + inline void flush_buffer_into_l0(BuffView buffer) { + size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - if constexpr (L == LayoutPolicy::LEVELING) { - // FIXME: Kludgey implementation due to interface constraints. - auto old_level = m_levels[0].get(); - auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); - temp_level->append_buffer(std::move(buffer)); - - if (old_level->get_shard_count() > 0) { - m_levels[0] = InternalLevel<R, Shard, Q>::reconstruction(old_level, temp_level); - delete temp_level; - } else { - m_levels[0] = std::shared_ptr<InternalLevel<R, Shard, Q>>(temp_level); - } - } else { - m_levels[0]->append_buffer(std::move(buffer)); - } + if (m_levels.size() == 0) { + m_levels.emplace_back( + std::shared_ptr<InternalLevel<ShardType, QueryType>>( + new InternalLevel<ShardType, QueryType>(0, shard_capacity))); - /* update the state vector */ - m_current_state[0].reccnt = m_levels[0]->get_record_count(); - m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); + m_current_state.push_back( + {0, calc_level_record_capacity(0), 0, shard_capacity}); } - /* - * Mark a given memory level as no-longer in use by the tree. For now this - * will just free the level. In future, this will be more complex as the - * level may not be able to immediately be deleted, depending upon who - * else is using it. - */ - inline void mark_as_unused(std::shared_ptr<InternalLevel<R, Shard, Q>> level) { - level.reset(); + if constexpr (L == LayoutPolicy::LEVELING) { + // FIXME: Kludgey implementation due to interface constraints. + auto old_level = m_levels[0].get(); + auto temp_level = new InternalLevel<ShardType, QueryType>(0, 1); + temp_level->append_buffer(std::move(buffer)); + + if (old_level->get_shard_count() > 0) { + m_levels[0] = InternalLevel<ShardType, QueryType>::reconstruction( + old_level, temp_level); + delete temp_level; + } else { + m_levels[0] = + std::shared_ptr<InternalLevel<ShardType, QueryType>>(temp_level); + } + } else { + m_levels[0]->append_buffer(std::move(buffer)); } - /* - * Assume that level "0" should be larger than the buffer. The buffer - * itself is index -1, which should return simply the buffer capacity. - */ - inline size_t calc_level_record_capacity(level_index idx) { - return m_buffer_size * pow(m_scale_factor, idx+1); + /* update the state vector */ + m_current_state[0].reccnt = m_levels[0]->get_record_count(); + m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); + } + + /* + * Mark a given memory level as no-longer in use by the tree. For now this + * will just free the level. In future, this will be more complex as the + * level may not be able to immediately be deleted, depending upon who + * else is using it. + */ + inline void + mark_as_unused(std::shared_ptr<InternalLevel<ShardType, QueryType>> level) { + level.reset(); + } + + /* + * Assume that level "0" should be larger than the buffer. The buffer + * itself is index -1, which should return simply the buffer capacity. + */ + inline size_t calc_level_record_capacity(level_index idx) { + return m_buffer_size * pow(m_scale_factor, idx + 1); + } + + /* + * Returns the number of records present on a specified level. + */ + inline size_t get_level_record_count(level_index idx) { + return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; + } + + /* + * Determines if a level can sustain a reconstruction with incoming_rec_cnt + * additional records without exceeding its capacity. + */ + inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, + state_vector &state) { + if (idx >= state.size()) { + return false; } - /* - * Returns the number of records present on a specified level. - */ - inline size_t get_level_record_count(level_index idx) { - return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; + if constexpr (L == LayoutPolicy::LEVELING) { + return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; + } else if constexpr (L == LayoutPolicy::BSM) { + return state[idx].reccnt == 0; + } else { + return state[idx].shardcnt < state[idx].shardcap; } - /* - * Determines if a level can sustain a reconstruction with incoming_rec_cnt - * additional records without exceeding its capacity. - */ - inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) { - if (idx >= state.size()) { - return false; - } - - if constexpr (L == LayoutPolicy::LEVELING) { - return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; - } else if constexpr (L == LayoutPolicy::BSM) { - return state[idx].reccnt == 0; - } else { - return state[idx].shardcnt < state[idx].shardcap; - } - - /* unreachable */ - assert(true); - } + /* unreachable */ + assert(true); + } }; -} - +} // namespace de |