From 90194cb5be5c7b80bfa21a353a75920e932d54d8 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 12 Feb 2024 10:39:52 -0500 Subject: Added structure state vector w/ scratch version for reconstruction This approach should allow us to "simulate" a reconstruction to monitor the future state of the structure. The idea being that we can then add pre-emptive reconstructions to load balance and further smooth the tail latency curve. If a given reconstruction is significantly smaller than the next one will be, we can move some of the next one's work preemptively into the current one. The next phase is to do the simulation within the scratch_vector and then do a second pass examining the state of that reconstruction. In principle, we could look arbitrarily far ahead using this technique. --- include/framework/structure/ExtensionStructure.h | 144 ++++++++++++++--------- 1 file changed, 89 insertions(+), 55 deletions(-) diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 4802bc1..823a66b 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -27,6 +27,16 @@ class ExtensionStructure { typedef S Shard; typedef BufferView BuffView; + typedef struct { + size_t reccnt; + size_t reccap; + + size_t shardcnt; + size_t shardcap; + } level_state; + + typedef std::vector state_vector; + public: ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) : m_scale_factor(scale_factor) @@ -59,6 +69,7 @@ public: } new_struct->m_refcnt = 0; + new_struct->m_current_state = m_current_state; return new_struct; } @@ -95,8 +106,8 @@ public: * takes a structure as input. */ inline bool flush_buffer(BuffView buffer) { - assert(can_reconstruct_with(0, buffer.get_record_count())); - + state_vector tmp = m_current_state; + assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); flush_buffer_into_l0(std::move(buffer)); return true; @@ -200,8 +211,16 @@ public: return m_levels; } + /* + * NOTE: This cannot be simulated, because tombstone cancellation is not + * cheaply predictable. It is possible that the worst case number could + * be used instead, to allow for prediction, but compaction isn't a + * major concern outside of sampling; at least for now. So I'm not + * going to focus too much time on it at the moment. + */ std::vector get_compaction_tasks() { std::vector tasks; + state_vector scratch_state = m_current_state; /* if the tombstone/delete invariant is satisfied, no need for compactions */ if (validate_tombstone_proportion()) { @@ -219,9 +238,9 @@ public: assert(violation_idx != -1); - level_index base_level = find_reconstruction_target(violation_idx); + level_index base_level = find_reconstruction_target(violation_idx, scratch_state); if (base_level == -1) { - base_level = grow(); + base_level = grow(scratch_state); } for (level_index i=base_level; i>0; i--) { @@ -239,7 +258,7 @@ public: */ size_t reccnt = m_levels[i - 1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt, scratch_state)) { reccnt += m_levels[i]->get_record_count(); } } @@ -254,46 +273,28 @@ public: /* * */ - std::vector get_reconstruction_tasks(size_t buffer_reccnt) { - std::vector reconstructions; + std::vector get_reconstruction_tasks(size_t buffer_reccnt, + state_vector scratch_state={}) { + /* + * If no scratch state vector is provided, use a copy of the + * current one. The only time an empty vector could be used as + * *real* input to this function is when the current state is also + * empty, so this should would even in that case. + */ + if (scratch_state.size() == 0) { + scratch_state = m_current_state; + } /* * The buffer flush is not included so if that can be done without any * other change, just return an empty list. */ - if (can_reconstruct_with(0, buffer_reccnt)) { + std::vector reconstructions; + if (can_reconstruct_with(0, buffer_reccnt, scratch_state)) { return std::move(reconstructions); } - level_index base_level = find_reconstruction_target(0); - if (base_level == -1) { - base_level = grow(); - } - - for (level_index i=base_level; i>0; i--) { - ReconstructionTask task = {i-1, i}; - - /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. - */ - size_t reccnt = m_levels[i-1]->get_record_count(); - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { - reccnt += m_levels[i]->get_record_count(); - } - } - //task.m_size = 2* reccnt * sizeof(R); - - reconstructions.push_back(task); - } - + reconstructions = get_reconstruction_tasks_from_level(0, scratch_state); return std::move(reconstructions); } @@ -301,12 +302,12 @@ public: /* * */ - std::vector get_reconstruction_tasks_from_level(level_index source_level) { + std::vector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) { std::vector reconstructions; - level_index base_level = find_reconstruction_target(source_level); + level_index base_level = find_reconstruction_target(source_level, scratch_state); if (base_level == -1) { - base_level = grow(); + base_level = grow(scratch_state); } for (level_index i=base_level; i>source_level; i--) { @@ -323,7 +324,7 @@ public: */ size_t reccnt = m_levels[i-1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt, scratch_state)) { reccnt += m_levels[i]->get_record_count(); } } @@ -332,7 +333,7 @@ public: reconstructions.push_back(task); } - return reconstructions; + return std::move(reconstructions); } /* @@ -342,7 +343,10 @@ public: * tombstone ordering invariant may be violated. */ inline void reconstruction(level_index base_level, level_index incoming_level) { + + size_t shard_capacity; if constexpr (L == LayoutPolicy::LEVELING) { + shard_capacity = 1; /* if the base level has a shard, merge the base and incoming together to make a new one */ if (m_levels[base_level]->get_shard_count() > 0) { m_levels[base_level] = InternalLevel::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get()); @@ -350,13 +354,23 @@ public: } else { m_levels[base_level] = m_levels[incoming_level]; } + } else { + shard_capacity = m_scale_factor; m_levels[base_level]->append_level(m_levels[incoming_level].get()); m_levels[base_level]->finalize(); } /* place a new, empty level where the incoming level used to be */ m_levels[incoming_level] = std::shared_ptr>(new InternalLevel(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + + /* + * Update the state vector to match the *real* stage following + * the reconstruction + */ + m_current_state[base_level] = {m_levels[base_level]->get_record_count(), + calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity}; + m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; } bool take_reference() { @@ -393,14 +407,26 @@ private: std::vector>> m_levels; + /* + * A pair of for each level in the + * structure. Record counts may be slightly inaccurate due to + * deletes. + */ + state_vector m_current_state; + /* * Add a new level to the structure and return its index. */ - inline level_index grow() { + inline level_index grow(state_vector &scratch_state) { level_index new_idx = m_levels.size(); - size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + m_levels.emplace_back(std::shared_ptr>(new InternalLevel(new_idx, new_shard_cap))); - m_levels.emplace_back(std::shared_ptr>(new InternalLevel(new_idx, new_shard_cnt))); + m_current_state.push_back({0, calc_level_record_capacity(new_idx), + 0, new_shard_cap}); + scratch_state.push_back({0, calc_level_record_capacity(new_idx), + 0, new_shard_cap}); return new_idx; } @@ -411,17 +437,21 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first buffer flush. */ - inline level_index find_reconstruction_target(level_index idx) { + inline level_index find_reconstruction_target(level_index idx, state_vector &state) { - if (idx == 0 && m_levels.size() == 0) return -1; + /* + * this handles the very first buffer flush, when the state vector + * is empty. + */ + if (idx == 0 && state.size() == 0) return -1; - size_t incoming_rec_cnt = get_level_record_count(idx); - for (level_index i=idx+1; iappend_buffer(std::move(buffer)); } + + /* update the state vector */ + m_current_state[0].reccnt = m_levels[0]->get_record_count(); + m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); } /* @@ -475,15 +509,15 @@ private: * Determines if a level can sustain a reconstruction with incoming_rec_cnt * additional records without exceeding its capacity. */ - inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) { - if (idx >= m_levels.size() || !m_levels[idx]) { + inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) { + if (idx >= state.size()) { return false; } if (L == LayoutPolicy::LEVELING) { - return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); + return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; } else { - return m_levels[idx]->get_shard_count() < m_scale_factor; + return state[idx].shardcnt < state[idx].shardcap; } /* unreachable */ -- cgit v1.2.3