diff options
Diffstat (limited to 'include/framework/structure')
| -rw-r--r-- | include/framework/structure/BufferView.h | 10 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 302 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 19 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 13 |
4 files changed, 260 insertions, 84 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 9e0872b..e95a799 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -20,7 +20,7 @@ namespace de { -typedef std::_Bind<void (*(void*, long unsigned int))(void*, long unsigned int)> ReleaseFunction; +typedef std::function<void(void)> ReleaseFunction; template <RecordInterface R> class BufferView { @@ -112,6 +112,10 @@ public: size_t get_record_count() { return m_tail - m_head; } + + size_t get_capacity() { + return m_cap; + } /* * NOTE: This function returns an upper bound on the number @@ -123,7 +127,7 @@ public: } Wrapped<R> *get(size_t i) { - assert(i < get_record_count()); + //assert(i < get_record_count()); return m_data + to_idx(i); } @@ -160,7 +164,7 @@ private: bool m_active; size_t to_idx(size_t i) { - size_t idx = (m_start + i >= m_cap) ? i = (m_cap - m_start) + size_t idx = (m_start + i >= m_cap) ? i - (m_cap - m_start) : m_start + i; assert(idx < m_cap); return idx; diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 4802bc1..b83674b 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -27,6 +27,16 @@ class ExtensionStructure { typedef S Shard; typedef BufferView<R> BuffView; + typedef struct { + size_t reccnt; + size_t reccap; + + size_t shardcnt; + size_t shardcap; + } level_state; + + typedef std::vector<level_state> state_vector; + public: ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) : m_scale_factor(scale_factor) @@ -59,6 +69,7 @@ public: } new_struct->m_refcnt = 0; + new_struct->m_current_state = m_current_state; return new_struct; } @@ -95,8 +106,13 @@ public: * takes a structure as input. */ inline bool flush_buffer(BuffView buffer) { - assert(can_reconstruct_with(0, buffer.get_record_count())); + state_vector tmp = m_current_state; + + if (tmp.size() == 0) { + grow(tmp); + } + assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); flush_buffer_into_l0(std::move(buffer)); return true; @@ -200,8 +216,16 @@ public: return m_levels; } - std::vector<ReconstructionTask> get_compaction_tasks() { - std::vector<ReconstructionTask> tasks; + /* + * NOTE: This cannot be simulated, because tombstone cancellation is not + * cheaply predictable. It is possible that the worst case number could + * be used instead, to allow for prediction, but compaction isn't a + * major concern outside of sampling; at least for now. So I'm not + * going to focus too much time on it at the moment. + */ + ReconstructionVector get_compaction_tasks() { + ReconstructionVector tasks; + state_vector scratch_state = m_current_state; /* if the tombstone/delete invariant is satisfied, no need for compactions */ if (validate_tombstone_proportion()) { @@ -219,14 +243,12 @@ public: assert(violation_idx != -1); - level_index base_level = find_reconstruction_target(violation_idx); + level_index base_level = find_reconstruction_target(violation_idx, scratch_state); if (base_level == -1) { - base_level = grow(); + base_level = grow(scratch_state); } for (level_index i=base_level; i>0; i--) { - ReconstructionTask task = {i-1, i}; - /* * The amount of storage required for the reconstruction accounts * for the cost of storing the new records, along with the @@ -239,13 +261,11 @@ public: */ size_t reccnt = m_levels[i - 1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt, scratch_state)) { reccnt += m_levels[i]->get_record_count(); } } - //task.m_size = 2* reccnt * sizeof(R); - - tasks.push_back(task); + tasks.add_reconstruction(i-i, i, reccnt); } return tasks; @@ -254,44 +274,53 @@ public: /* * */ - std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt) { - std::vector<ReconstructionTask> reconstructions; - - /* - * The buffer flush is not included so if that can be done without any - * other change, just return an empty list. + ReconstructionVector get_reconstruction_tasks(size_t buffer_reccnt, + state_vector scratch_state={}) { + /* + * If no scratch state vector is provided, use a copy of the + * current one. The only time an empty vector could be used as + * *real* input to this function is when the current state is also + * empty, so this should would even in that case. */ - if (can_reconstruct_with(0, buffer_reccnt)) { - return std::move(reconstructions); + if (scratch_state.size() == 0) { + scratch_state = m_current_state; } - level_index base_level = find_reconstruction_target(0); - if (base_level == -1) { - base_level = grow(); - } - - for (level_index i=base_level; i>0; i--) { - ReconstructionTask task = {i-1, i}; - + ReconstructionVector reconstructions; + size_t LOOKAHEAD = 1; + for (size_t i=0; i<LOOKAHEAD; i++) { /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. + * If L0 cannot support a direct buffer flush, figure out what + * work must be done to free up space first. Otherwise, the + * reconstruction vector will be initially empty. */ - size_t reccnt = m_levels[i-1]->get_record_count(); - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { - reccnt += m_levels[i]->get_record_count(); + if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) { + auto local_recon = get_reconstruction_tasks_from_level(0, scratch_state); + + /* + * for the first iteration, we need to do all of the + * reconstructions, so use these to initially the returned + * reconstruction list + */ + if (i == 0) { + reconstructions = local_recon; + /* + * Quick sanity test of idea: if the next reconstruction + * would be larger than this one, steal the largest + * task from it and run it now instead. + */ + } else if (local_recon.get_total_reccnt() > reconstructions.get_total_reccnt()) { + auto t = local_recon.remove_reconstruction(0); + reconstructions.add_reconstruction(t); } } - //task.m_size = 2* reccnt * sizeof(R); - reconstructions.push_back(task); + /* simulate the buffer flush in the scratch state */ + scratch_state[0].reccnt += buffer_reccnt; + if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) { + scratch_state[0].shardcnt += 1; + } + } return std::move(reconstructions); @@ -301,38 +330,109 @@ public: /* * */ - std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level) { - std::vector<ReconstructionTask> reconstructions; + ReconstructionVector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) { + ReconstructionVector reconstructions; - level_index base_level = find_reconstruction_target(source_level); + /* + * Find the first level capable of sustaining a reconstruction from + * the level above it. If no such level exists, add a new one at + * the bottom of the structure. + */ + level_index base_level = find_reconstruction_target(source_level, scratch_state); if (base_level == -1) { - base_level = grow(); + base_level = grow(scratch_state); + } + + if constexpr (L == LayoutPolicy::BSM) { + if (base_level == 0) { + return std::move(reconstructions); + } + + ReconstructionTask task; + task.target = base_level; + + size_t base_reccnt = 0; + for (level_index i=base_level; i>source_level; i--) { + auto recon_reccnt = scratch_state[i-1].reccnt; + base_reccnt += recon_reccnt; + scratch_state[i-1].reccnt = 0; + scratch_state[i-1].shardcnt = 0; + task.add_source(i-1, recon_reccnt); + } + + reconstructions.add_reconstruction(task); + scratch_state[base_level].reccnt = base_reccnt; + scratch_state[base_level].shardcnt = 1; + + return std::move(reconstructions); } + /* + * Determine the full set of reconstructions necessary to open up + * space in the source level. + */ for (level_index i=base_level; i>source_level; i--) { - ReconstructionTask task = {i - 1, i}; + size_t recon_reccnt = scratch_state[i-1].reccnt; + size_t base_reccnt = recon_reccnt; + /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. + * If using Leveling, the total reconstruction size will be the + * records in *both* base and target, because they will need to + * be merged (assuming that target isn't empty). */ - size_t reccnt = m_levels[i-1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { - reccnt += m_levels[i]->get_record_count(); + if (can_reconstruct_with(i, base_reccnt, scratch_state)) { + recon_reccnt += scratch_state[i].reccnt; } } -// task.m_size = 2* reccnt * sizeof(R); + reconstructions.add_reconstruction(i-1, i, recon_reccnt); + + /* + * The base level will be emptied and its records moved to + * the target. + */ + scratch_state[i-1].reccnt = 0; + scratch_state[i-1].shardcnt = 0; - reconstructions.push_back(task); + /* + * The target level will have the records from the base level + * added to it, and potentially gain a shard if the LayoutPolicy + * is tiering or the level currently lacks any shards at all. + */ + scratch_state[i].reccnt += base_reccnt; + if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) { + scratch_state[i].shardcnt += 1; + } } - return reconstructions; + return std::move(reconstructions); + } + + inline void reconstruction(ReconstructionTask task) { + static_assert(L == LayoutPolicy::BSM); + std::vector<InternalLevel<R, Shard, Q>*> levels(task.sources.size()); + for (size_t i=0; i<task.sources.size(); i++) { + levels[i] = m_levels[task.sources[i]].get(); + } + + auto new_level = InternalLevel<R, Shard, Q>::reconstruction(levels, task.target); + if (task.target >= m_levels.size()) { + m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target), + 1, 1}); + m_levels.emplace_back(new_level); + } else { + m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target), + 1, 1}; + m_levels[task.target] = new_level; + } + + /* remove all of the levels that have been flattened */ + for (size_t i=0; i<task.sources.size(); i++) { + m_levels[task.sources[i]] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(task.sources[i], 1)); + m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1}; + } + + return; } /* @@ -342,6 +442,14 @@ public: * tombstone ordering invariant may be violated. */ inline void reconstruction(level_index base_level, level_index incoming_level) { + size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + if (base_level >= m_levels.size()) { + m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(base_level, shard_capacity))); + m_current_state.push_back({0, calc_level_record_capacity(base_level), + 0, shard_capacity}); + } + if constexpr (L == LayoutPolicy::LEVELING) { /* if the base level has a shard, merge the base and incoming together to make a new one */ if (m_levels[base_level]->get_shard_count() > 0) { @@ -350,6 +458,7 @@ public: } else { m_levels[base_level] = m_levels[incoming_level]; } + } else { m_levels[base_level]->append_level(m_levels[incoming_level].get()); m_levels[base_level]->finalize(); @@ -357,6 +466,14 @@ public: /* place a new, empty level where the incoming level used to be */ m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + + /* + * Update the state vector to match the *real* state following + * the reconstruction + */ + m_current_state[base_level] = {m_levels[base_level]->get_record_count(), + calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity}; + m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; } bool take_reference() { @@ -393,14 +510,27 @@ private: std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels; + /* + * A pair of <record_count, shard_count> for each level in the + * structure. Record counts may be slightly inaccurate due to + * deletes. + */ + state_vector m_current_state; + /* - * Add a new level to the structure and return its index. + * Add a new level to the scratch state and return its index. + * + * IMPORTANT: This does _not_ add a level to the extension structure + * anymore. This is handled by the appropriate reconstruction and flush + * methods as needed. This function is for use in "simulated" + * reconstructions. */ - inline level_index grow() { + inline level_index grow(state_vector &scratch_state) { level_index new_idx = m_levels.size(); - size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt))); + scratch_state.push_back({0, calc_level_record_capacity(new_idx), + 0, new_shard_cap}); return new_idx; } @@ -411,24 +541,36 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first buffer flush. */ - inline level_index find_reconstruction_target(level_index idx) { + inline level_index find_reconstruction_target(level_index idx, state_vector &state) { - if (idx == 0 && m_levels.size() == 0) return -1; + /* + * this handles the very first buffer flush, when the state vector + * is empty. + */ + if (idx == 0 && state.size() == 0) return -1; - size_t incoming_rec_cnt = get_level_record_count(idx); - for (level_index i=idx+1; i<m_levels.size(); i++) { - if (can_reconstruct_with(i, incoming_rec_cnt)) { + size_t incoming_rec_cnt = state[idx].reccnt; + for (level_index i=idx+1; i<state.size(); i++) { + if (can_reconstruct_with(i, incoming_rec_cnt, state)) { return i; } - incoming_rec_cnt = get_level_record_count(i); + incoming_rec_cnt = state[idx].reccnt; } return -1; } inline void flush_buffer_into_l0(BuffView buffer) { - assert(m_levels[0]); + size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + if (m_levels.size() == 0) { + m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(0, shard_capacity))); + + m_current_state.push_back({0, calc_level_record_capacity(0), + 0, shard_capacity}); + } + if constexpr (L == LayoutPolicy::LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0].get(); @@ -444,6 +586,10 @@ private: } else { m_levels[0]->append_buffer(std::move(buffer)); } + + /* update the state vector */ + m_current_state[0].reccnt = m_levels[0]->get_record_count(); + m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); } /* @@ -475,15 +621,17 @@ private: * Determines if a level can sustain a reconstruction with incoming_rec_cnt * additional records without exceeding its capacity. */ - inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) { - if (idx >= m_levels.size() || !m_levels[idx]) { + inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) { + if (idx >= state.size()) { return false; } - if (L == LayoutPolicy::LEVELING) { - return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); - } else { - return m_levels[idx]->get_shard_count() < m_scale_factor; + if constexpr (L == LayoutPolicy::LEVELING) { + return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; + } else if constexpr (L == LayoutPolicy::BSM) { + return state[idx].reccnt == 0; + } else { + return state[idx].shardcnt < state[idx].shardcap; } /* unreachable */ diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index db38946..b962dcc 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -64,6 +64,21 @@ public: return std::shared_ptr<InternalLevel>(res); } + static std::shared_ptr<InternalLevel> reconstruction(std::vector<InternalLevel*> levels, size_t level_idx) { + std::vector<Shard *> shards; + for (auto level : levels) { + for (auto shard : level->m_shards) { + if (shard) shards.emplace_back(shard.get()); + } + } + + auto res = new InternalLevel(level_idx, 1); + res->m_shard_cnt = 1; + res->m_shards[0] = std::make_shared<S>(shards); + + return std::shared_ptr<InternalLevel>(res); + } + /* * Create a new shard combining the records from all of * the shards in level, and append this new shard into @@ -185,6 +200,10 @@ public: } Shard* get_shard(size_t idx) { + if (idx >= m_shard_cnt) { + return nullptr; + } + return m_shards[idx].get(); } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 415c95a..7db3980 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -50,18 +50,19 @@ public: , m_tail(0) , m_head({0, 0}) , m_old_head({high_watermark, 0}) - , m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>))) + //, m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>))) + , m_data(new Wrapped<R>[m_cap]()) , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS)) , m_tscnt(0) , m_old_tscnt(0) , m_active_head_advance(false) { assert(m_cap > m_hwm); - assert(m_hwm > m_lwm); + assert(m_hwm >= m_lwm); } ~MutableBuffer() { - free(m_data); + delete[] m_data; delete m_tombstone_filter; } @@ -76,16 +77,20 @@ public: wrec.header = 0; if (tombstone) wrec.set_tombstone(); + // FIXME: because of the mod, it isn't correct to use `pos` + // as the ordering timestamp in the header anymore. size_t pos = tail % m_cap; m_data[pos] = wrec; - m_data[pos].header |= (pos << 2); + m_data[pos].set_timestamp(pos); if (tombstone) { m_tscnt.fetch_add(1); if (m_tombstone_filter) m_tombstone_filter->insert(rec); } + m_data[pos].set_visible(); + return 1; } |