diff options
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 144 |
1 files changed, 89 insertions, 55 deletions
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 4802bc1..823a66b 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -27,6 +27,16 @@ class ExtensionStructure { typedef S Shard; typedef BufferView<R> BuffView; + typedef struct { + size_t reccnt; + size_t reccap; + + size_t shardcnt; + size_t shardcap; + } level_state; + + typedef std::vector<level_state> state_vector; + public: ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) : m_scale_factor(scale_factor) @@ -59,6 +69,7 @@ public: } new_struct->m_refcnt = 0; + new_struct->m_current_state = m_current_state; return new_struct; } @@ -95,8 +106,8 @@ public: * takes a structure as input. */ inline bool flush_buffer(BuffView buffer) { - assert(can_reconstruct_with(0, buffer.get_record_count())); - + state_vector tmp = m_current_state; + assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); flush_buffer_into_l0(std::move(buffer)); return true; @@ -200,8 +211,16 @@ public: return m_levels; } + /* + * NOTE: This cannot be simulated, because tombstone cancellation is not + * cheaply predictable. It is possible that the worst case number could + * be used instead, to allow for prediction, but compaction isn't a + * major concern outside of sampling; at least for now. So I'm not + * going to focus too much time on it at the moment. + */ std::vector<ReconstructionTask> get_compaction_tasks() { std::vector<ReconstructionTask> tasks; + state_vector scratch_state = m_current_state; /* if the tombstone/delete invariant is satisfied, no need for compactions */ if (validate_tombstone_proportion()) { @@ -219,9 +238,9 @@ public: assert(violation_idx != -1); - level_index base_level = find_reconstruction_target(violation_idx); + level_index base_level = find_reconstruction_target(violation_idx, scratch_state); if (base_level == -1) { - base_level = grow(); + base_level = grow(scratch_state); } for (level_index i=base_level; i>0; i--) { @@ -239,7 +258,7 @@ public: */ size_t reccnt = m_levels[i - 1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt, scratch_state)) { reccnt += m_levels[i]->get_record_count(); } } @@ -254,46 +273,28 @@ public: /* * */ - std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt) { - std::vector<ReconstructionTask> reconstructions; + std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt, + state_vector scratch_state={}) { + /* + * If no scratch state vector is provided, use a copy of the + * current one. The only time an empty vector could be used as + * *real* input to this function is when the current state is also + * empty, so this should would even in that case. + */ + if (scratch_state.size() == 0) { + scratch_state = m_current_state; + } /* * The buffer flush is not included so if that can be done without any * other change, just return an empty list. */ - if (can_reconstruct_with(0, buffer_reccnt)) { + std::vector<ReconstructionTask> reconstructions; + if (can_reconstruct_with(0, buffer_reccnt, scratch_state)) { return std::move(reconstructions); } - level_index base_level = find_reconstruction_target(0); - if (base_level == -1) { - base_level = grow(); - } - - for (level_index i=base_level; i>0; i--) { - ReconstructionTask task = {i-1, i}; - - /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. - */ - size_t reccnt = m_levels[i-1]->get_record_count(); - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { - reccnt += m_levels[i]->get_record_count(); - } - } - //task.m_size = 2* reccnt * sizeof(R); - - reconstructions.push_back(task); - } - + reconstructions = get_reconstruction_tasks_from_level(0, scratch_state); return std::move(reconstructions); } @@ -301,12 +302,12 @@ public: /* * */ - std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level) { + std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) { std::vector<ReconstructionTask> reconstructions; - level_index base_level = find_reconstruction_target(source_level); + level_index base_level = find_reconstruction_target(source_level, scratch_state); if (base_level == -1) { - base_level = grow(); + base_level = grow(scratch_state); } for (level_index i=base_level; i>source_level; i--) { @@ -323,7 +324,7 @@ public: */ size_t reccnt = m_levels[i-1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt, scratch_state)) { reccnt += m_levels[i]->get_record_count(); } } @@ -332,7 +333,7 @@ public: reconstructions.push_back(task); } - return reconstructions; + return std::move(reconstructions); } /* @@ -342,7 +343,10 @@ public: * tombstone ordering invariant may be violated. */ inline void reconstruction(level_index base_level, level_index incoming_level) { + + size_t shard_capacity; if constexpr (L == LayoutPolicy::LEVELING) { + shard_capacity = 1; /* if the base level has a shard, merge the base and incoming together to make a new one */ if (m_levels[base_level]->get_shard_count() > 0) { m_levels[base_level] = InternalLevel<R, Shard, Q>::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get()); @@ -350,13 +354,23 @@ public: } else { m_levels[base_level] = m_levels[incoming_level]; } + } else { + shard_capacity = m_scale_factor; m_levels[base_level]->append_level(m_levels[incoming_level].get()); m_levels[base_level]->finalize(); } /* place a new, empty level where the incoming level used to be */ m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + + /* + * Update the state vector to match the *real* stage following + * the reconstruction + */ + m_current_state[base_level] = {m_levels[base_level]->get_record_count(), + calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity}; + m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; } bool take_reference() { @@ -393,14 +407,26 @@ private: std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels; + /* + * A pair of <record_count, shard_count> for each level in the + * structure. Record counts may be slightly inaccurate due to + * deletes. + */ + state_vector m_current_state; + /* * Add a new level to the structure and return its index. */ - inline level_index grow() { + inline level_index grow(state_vector &scratch_state) { level_index new_idx = m_levels.size(); - size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cap))); - m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt))); + m_current_state.push_back({0, calc_level_record_capacity(new_idx), + 0, new_shard_cap}); + scratch_state.push_back({0, calc_level_record_capacity(new_idx), + 0, new_shard_cap}); return new_idx; } @@ -411,17 +437,21 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first buffer flush. */ - inline level_index find_reconstruction_target(level_index idx) { + inline level_index find_reconstruction_target(level_index idx, state_vector &state) { - if (idx == 0 && m_levels.size() == 0) return -1; + /* + * this handles the very first buffer flush, when the state vector + * is empty. + */ + if (idx == 0 && state.size() == 0) return -1; - size_t incoming_rec_cnt = get_level_record_count(idx); - for (level_index i=idx+1; i<m_levels.size(); i++) { - if (can_reconstruct_with(i, incoming_rec_cnt)) { + size_t incoming_rec_cnt = state[idx].reccnt; + for (level_index i=idx+1; i<state.size(); i++) { + if (can_reconstruct_with(i, incoming_rec_cnt, state)) { return i; } - incoming_rec_cnt = get_level_record_count(i); + incoming_rec_cnt = state[idx].reccnt; } return -1; @@ -444,6 +474,10 @@ private: } else { m_levels[0]->append_buffer(std::move(buffer)); } + + /* update the state vector */ + m_current_state[0].reccnt = m_levels[0]->get_record_count(); + m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); } /* @@ -475,15 +509,15 @@ private: * Determines if a level can sustain a reconstruction with incoming_rec_cnt * additional records without exceeding its capacity. */ - inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) { - if (idx >= m_levels.size() || !m_levels[idx]) { + inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) { + if (idx >= state.size()) { return false; } if (L == LayoutPolicy::LEVELING) { - return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); + return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; } else { - return m_levels[idx]->get_shard_count() < m_scale_factor; + return state[idx].shardcnt < state[idx].shardcap; } /* unreachable */ |