diff options
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 21 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 60 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 19 | ||||
| -rw-r--r-- | include/framework/util/Configuration.h | 3 |
4 files changed, 95 insertions, 8 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 6fd95c6..538ff25 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -54,6 +54,10 @@ public: , m_next_core(0) , m_epoch_cnt(0) { + if constexpr (L == LayoutPolicy::BSM) { + assert(scale_factor == 2); + } + auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); m_previous_epoch.store({nullptr, 0}); @@ -487,10 +491,17 @@ private: ((DynamicExtension *) args->extension)->SetThreadAffinity(); Structure *vers = args->epoch->get_structure(); - for (ssize_t i=0; i<args->merges.size(); i++) { - vers->reconstruction(args->merges[i].target, args->merges[i].source); + if constexpr (L == LayoutPolicy::BSM) { + if (args->merges.size() > 0) { + vers->reconstruction(args->merges[0]); + } + } else { + for (ssize_t i=0; i<args->merges.size(); i++) { + vers->reconstruction(args->merges[i].target, args->merges[i].sources[0]); + } } + /* * we'll grab the buffer AFTER doing the internal reconstruction, so we * can flush as many records as possible in one go. The reconstruction @@ -628,7 +639,7 @@ private: static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) { if constexpr (Q::SKIP_DELETE_FILTER) { - return records; + return std::move(records); } std::vector<Wrapped<R>> processed_records; @@ -691,6 +702,10 @@ private: #ifdef _GNU_SOURCE void SetThreadAffinity() { + if constexpr (std::same_as<SCHED, SerialScheduler>) { + return; + } + int core = m_next_core.fetch_add(1) % m_core_cnt; cpu_set_t mask; CPU_ZERO(&mask); diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index ffba1c1..b83674b 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -343,6 +343,30 @@ public: base_level = grow(scratch_state); } + if constexpr (L == LayoutPolicy::BSM) { + if (base_level == 0) { + return std::move(reconstructions); + } + + ReconstructionTask task; + task.target = base_level; + + size_t base_reccnt = 0; + for (level_index i=base_level; i>source_level; i--) { + auto recon_reccnt = scratch_state[i-1].reccnt; + base_reccnt += recon_reccnt; + scratch_state[i-1].reccnt = 0; + scratch_state[i-1].shardcnt = 0; + task.add_source(i-1, recon_reccnt); + } + + reconstructions.add_reconstruction(task); + scratch_state[base_level].reccnt = base_reccnt; + scratch_state[base_level].shardcnt = 1; + + return std::move(reconstructions); + } + /* * Determine the full set of reconstructions necessary to open up * space in the source level. @@ -384,6 +408,33 @@ public: return std::move(reconstructions); } + inline void reconstruction(ReconstructionTask task) { + static_assert(L == LayoutPolicy::BSM); + std::vector<InternalLevel<R, Shard, Q>*> levels(task.sources.size()); + for (size_t i=0; i<task.sources.size(); i++) { + levels[i] = m_levels[task.sources[i]].get(); + } + + auto new_level = InternalLevel<R, Shard, Q>::reconstruction(levels, task.target); + if (task.target >= m_levels.size()) { + m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target), + 1, 1}); + m_levels.emplace_back(new_level); + } else { + m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target), + 1, 1}; + m_levels[task.target] = new_level; + } + + /* remove all of the levels that have been flattened */ + for (size_t i=0; i<task.sources.size(); i++) { + m_levels[task.sources[i]] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(task.sources[i], 1)); + m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1}; + } + + return; + } + /* * Combine incoming_level with base_level and reconstruct the shard, * placing it in base_level. The two levels should be sequential--i.e. no @@ -395,7 +446,6 @@ public: if (base_level >= m_levels.size()) { m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(base_level, shard_capacity))); - m_current_state.push_back({0, calc_level_record_capacity(base_level), 0, shard_capacity}); } @@ -418,7 +468,7 @@ public: m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); /* - * Update the state vector to match the *real* stage following + * Update the state vector to match the *real* state following * the reconstruction */ m_current_state[base_level] = {m_levels[base_level]->get_record_count(), @@ -576,9 +626,11 @@ private: return false; } - if (L == LayoutPolicy::LEVELING) { + if constexpr (L == LayoutPolicy::LEVELING) { return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; - } else { + } else if constexpr (L == LayoutPolicy::BSM) { + return state[idx].reccnt == 0; + } else { return state[idx].shardcnt < state[idx].shardcap; } diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index db38946..b962dcc 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -64,6 +64,21 @@ public: return std::shared_ptr<InternalLevel>(res); } + static std::shared_ptr<InternalLevel> reconstruction(std::vector<InternalLevel*> levels, size_t level_idx) { + std::vector<Shard *> shards; + for (auto level : levels) { + for (auto shard : level->m_shards) { + if (shard) shards.emplace_back(shard.get()); + } + } + + auto res = new InternalLevel(level_idx, 1); + res->m_shard_cnt = 1; + res->m_shards[0] = std::make_shared<S>(shards); + + return std::shared_ptr<InternalLevel>(res); + } + /* * Create a new shard combining the records from all of * the shards in level, and append this new shard into @@ -185,6 +200,10 @@ public: } Shard* get_shard(size_t idx) { + if (idx >= m_shard_cnt) { + return nullptr; + } + return m_shards[idx].get(); } diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 55cc682..4a4524a 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -35,7 +35,8 @@ static thread_local size_t sampling_bailouts = 0; enum class LayoutPolicy { LEVELING, - TEIRING + TEIRING, + BSM }; enum class DeletePolicy { |