diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-02-12 10:39:52 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-02-12 11:12:32 -0500 |
| commit | 90194cb5be5c7b80bfa21a353a75920e932d54d8 (patch) | |
| tree | 90f7e884f672378baeaf06b30693992ca8994c06 /include/framework/structure/ExtensionStructure.h | |
| parent | 3dd16320d38f6312e7f5d7164b6bb7a4e14790fa (diff) | |
| download | dynamic-extension-90194cb5be5c7b80bfa21a353a75920e932d54d8.tar.gz | |
Added structure state vector w/ scratch version for reconstruction
This approach should allow us to "simulate" a reconstruction to monitor
the future state of the structure. The idea being that we can then add
pre-emptive reconstructions to load balance and further smooth the tail
latency curve. If a given reconstruction is significantly smaller than
the next one will be, we can move some of the next one's work preemptively
into the current one.
The next phase is to do the simulation within the scratch_vector and
then do a second pass examining the state of that reconstruction. In
principle, we could look arbitrarily far ahead using this technique.
Diffstat (limited to 'include/framework/structure/ExtensionStructure.h')
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 144 |
1 files changed, 89 insertions, 55 deletions
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 4802bc1..823a66b 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -27,6 +27,16 @@ class ExtensionStructure { typedef S Shard; typedef BufferView<R> BuffView; + typedef struct { + size_t reccnt; + size_t reccap; + + size_t shardcnt; + size_t shardcap; + } level_state; + + typedef std::vector<level_state> state_vector; + public: ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) : m_scale_factor(scale_factor) @@ -59,6 +69,7 @@ public: } new_struct->m_refcnt = 0; + new_struct->m_current_state = m_current_state; return new_struct; } @@ -95,8 +106,8 @@ public: * takes a structure as input. */ inline bool flush_buffer(BuffView buffer) { - assert(can_reconstruct_with(0, buffer.get_record_count())); - + state_vector tmp = m_current_state; + assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); flush_buffer_into_l0(std::move(buffer)); return true; @@ -200,8 +211,16 @@ public: return m_levels; } + /* + * NOTE: This cannot be simulated, because tombstone cancellation is not + * cheaply predictable. It is possible that the worst case number could + * be used instead, to allow for prediction, but compaction isn't a + * major concern outside of sampling; at least for now. So I'm not + * going to focus too much time on it at the moment. + */ std::vector<ReconstructionTask> get_compaction_tasks() { std::vector<ReconstructionTask> tasks; + state_vector scratch_state = m_current_state; /* if the tombstone/delete invariant is satisfied, no need for compactions */ if (validate_tombstone_proportion()) { @@ -219,9 +238,9 @@ public: assert(violation_idx != -1); - level_index base_level = find_reconstruction_target(violation_idx); + level_index base_level = find_reconstruction_target(violation_idx, scratch_state); if (base_level == -1) { - base_level = grow(); + base_level = grow(scratch_state); } for (level_index i=base_level; i>0; i--) { @@ -239,7 +258,7 @@ public: */ size_t reccnt = m_levels[i - 1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt, scratch_state)) { reccnt += m_levels[i]->get_record_count(); } } @@ -254,46 +273,28 @@ public: /* * */ - std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt) { - std::vector<ReconstructionTask> reconstructions; + std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt, + state_vector scratch_state={}) { + /* + * If no scratch state vector is provided, use a copy of the + * current one. The only time an empty vector could be used as + * *real* input to this function is when the current state is also + * empty, so this should would even in that case. + */ + if (scratch_state.size() == 0) { + scratch_state = m_current_state; + } /* * The buffer flush is not included so if that can be done without any * other change, just return an empty list. */ - if (can_reconstruct_with(0, buffer_reccnt)) { + std::vector<ReconstructionTask> reconstructions; + if (can_reconstruct_with(0, buffer_reccnt, scratch_state)) { return std::move(reconstructions); } - level_index base_level = find_reconstruction_target(0); - if (base_level == -1) { - base_level = grow(); - } - - for (level_index i=base_level; i>0; i--) { - ReconstructionTask task = {i-1, i}; - - /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. - */ - size_t reccnt = m_levels[i-1]->get_record_count(); - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { - reccnt += m_levels[i]->get_record_count(); - } - } - //task.m_size = 2* reccnt * sizeof(R); - - reconstructions.push_back(task); - } - + reconstructions = get_reconstruction_tasks_from_level(0, scratch_state); return std::move(reconstructions); } @@ -301,12 +302,12 @@ public: /* * */ - std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level) { + std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) { std::vector<ReconstructionTask> reconstructions; - level_index base_level = find_reconstruction_target(source_level); + level_index base_level = find_reconstruction_target(source_level, scratch_state); if (base_level == -1) { - base_level = grow(); + base_level = grow(scratch_state); } for (level_index i=base_level; i>source_level; i--) { @@ -323,7 +324,7 @@ public: */ size_t reccnt = m_levels[i-1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt, scratch_state)) { reccnt += m_levels[i]->get_record_count(); } } @@ -332,7 +333,7 @@ public: reconstructions.push_back(task); } - return reconstructions; + return std::move(reconstructions); } /* @@ -342,7 +343,10 @@ public: * tombstone ordering invariant may be violated. */ inline void reconstruction(level_index base_level, level_index incoming_level) { + + size_t shard_capacity; if constexpr (L == LayoutPolicy::LEVELING) { + shard_capacity = 1; /* if the base level has a shard, merge the base and incoming together to make a new one */ if (m_levels[base_level]->get_shard_count() > 0) { m_levels[base_level] = InternalLevel<R, Shard, Q>::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get()); @@ -350,13 +354,23 @@ public: } else { m_levels[base_level] = m_levels[incoming_level]; } + } else { + shard_capacity = m_scale_factor; m_levels[base_level]->append_level(m_levels[incoming_level].get()); m_levels[base_level]->finalize(); } /* place a new, empty level where the incoming level used to be */ m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + + /* + * Update the state vector to match the *real* stage following + * the reconstruction + */ + m_current_state[base_level] = {m_levels[base_level]->get_record_count(), + calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity}; + m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; } bool take_reference() { @@ -393,14 +407,26 @@ private: std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels; + /* + * A pair of <record_count, shard_count> for each level in the + * structure. Record counts may be slightly inaccurate due to + * deletes. + */ + state_vector m_current_state; + /* * Add a new level to the structure and return its index. */ - inline level_index grow() { + inline level_index grow(state_vector &scratch_state) { level_index new_idx = m_levels.size(); - size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cap))); - m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt))); + m_current_state.push_back({0, calc_level_record_capacity(new_idx), + 0, new_shard_cap}); + scratch_state.push_back({0, calc_level_record_capacity(new_idx), + 0, new_shard_cap}); return new_idx; } @@ -411,17 +437,21 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first buffer flush. */ - inline level_index find_reconstruction_target(level_index idx) { + inline level_index find_reconstruction_target(level_index idx, state_vector &state) { - if (idx == 0 && m_levels.size() == 0) return -1; + /* + * this handles the very first buffer flush, when the state vector + * is empty. + */ + if (idx == 0 && state.size() == 0) return -1; - size_t incoming_rec_cnt = get_level_record_count(idx); - for (level_index i=idx+1; i<m_levels.size(); i++) { - if (can_reconstruct_with(i, incoming_rec_cnt)) { + size_t incoming_rec_cnt = state[idx].reccnt; + for (level_index i=idx+1; i<state.size(); i++) { + if (can_reconstruct_with(i, incoming_rec_cnt, state)) { return i; } - incoming_rec_cnt = get_level_record_count(i); + incoming_rec_cnt = state[idx].reccnt; } return -1; @@ -444,6 +474,10 @@ private: } else { m_levels[0]->append_buffer(std::move(buffer)); } + + /* update the state vector */ + m_current_state[0].reccnt = m_levels[0]->get_record_count(); + m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); } /* @@ -475,15 +509,15 @@ private: * Determines if a level can sustain a reconstruction with incoming_rec_cnt * additional records without exceeding its capacity. */ - inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) { - if (idx >= m_levels.size() || !m_levels[idx]) { + inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) { + if (idx >= state.size()) { return false; } if (L == LayoutPolicy::LEVELING) { - return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); + return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; } else { - return m_levels[idx]->get_shard_count() < m_scale_factor; + return state[idx].shardcnt < state[idx].shardcap; } /* unreachable */ |