summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h21
-rw-r--r--include/framework/structure/ExtensionStructure.h60
-rw-r--r--include/framework/structure/InternalLevel.h19
-rw-r--r--include/framework/util/Configuration.h3
4 files changed, 95 insertions, 8 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 6fd95c6..538ff25 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -54,6 +54,10 @@ public:
, m_next_core(0)
, m_epoch_cnt(0)
{
+ if constexpr (L == LayoutPolicy::BSM) {
+ assert(scale_factor == 2);
+ }
+
auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop);
m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0});
m_previous_epoch.store({nullptr, 0});
@@ -487,10 +491,17 @@ private:
((DynamicExtension *) args->extension)->SetThreadAffinity();
Structure *vers = args->epoch->get_structure();
- for (ssize_t i=0; i<args->merges.size(); i++) {
- vers->reconstruction(args->merges[i].target, args->merges[i].source);
+ if constexpr (L == LayoutPolicy::BSM) {
+ if (args->merges.size() > 0) {
+ vers->reconstruction(args->merges[0]);
+ }
+ } else {
+ for (ssize_t i=0; i<args->merges.size(); i++) {
+ vers->reconstruction(args->merges[i].target, args->merges[i].sources[0]);
+ }
}
+
/*
* we'll grab the buffer AFTER doing the internal reconstruction, so we
* can flush as many records as possible in one go. The reconstruction
@@ -628,7 +639,7 @@ private:
static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) {
if constexpr (Q::SKIP_DELETE_FILTER) {
- return records;
+ return std::move(records);
}
std::vector<Wrapped<R>> processed_records;
@@ -691,6 +702,10 @@ private:
#ifdef _GNU_SOURCE
void SetThreadAffinity() {
+ if constexpr (std::same_as<SCHED, SerialScheduler>) {
+ return;
+ }
+
int core = m_next_core.fetch_add(1) % m_core_cnt;
cpu_set_t mask;
CPU_ZERO(&mask);
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index ffba1c1..b83674b 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -343,6 +343,30 @@ public:
base_level = grow(scratch_state);
}
+ if constexpr (L == LayoutPolicy::BSM) {
+ if (base_level == 0) {
+ return std::move(reconstructions);
+ }
+
+ ReconstructionTask task;
+ task.target = base_level;
+
+ size_t base_reccnt = 0;
+ for (level_index i=base_level; i>source_level; i--) {
+ auto recon_reccnt = scratch_state[i-1].reccnt;
+ base_reccnt += recon_reccnt;
+ scratch_state[i-1].reccnt = 0;
+ scratch_state[i-1].shardcnt = 0;
+ task.add_source(i-1, recon_reccnt);
+ }
+
+ reconstructions.add_reconstruction(task);
+ scratch_state[base_level].reccnt = base_reccnt;
+ scratch_state[base_level].shardcnt = 1;
+
+ return std::move(reconstructions);
+ }
+
/*
* Determine the full set of reconstructions necessary to open up
* space in the source level.
@@ -384,6 +408,33 @@ public:
return std::move(reconstructions);
}
+ inline void reconstruction(ReconstructionTask task) {
+ static_assert(L == LayoutPolicy::BSM);
+ std::vector<InternalLevel<R, Shard, Q>*> levels(task.sources.size());
+ for (size_t i=0; i<task.sources.size(); i++) {
+ levels[i] = m_levels[task.sources[i]].get();
+ }
+
+ auto new_level = InternalLevel<R, Shard, Q>::reconstruction(levels, task.target);
+ if (task.target >= m_levels.size()) {
+ m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target),
+ 1, 1});
+ m_levels.emplace_back(new_level);
+ } else {
+ m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target),
+ 1, 1};
+ m_levels[task.target] = new_level;
+ }
+
+ /* remove all of the levels that have been flattened */
+ for (size_t i=0; i<task.sources.size(); i++) {
+ m_levels[task.sources[i]] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(task.sources[i], 1));
+ m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1};
+ }
+
+ return;
+ }
+
/*
* Combine incoming_level with base_level and reconstruct the shard,
* placing it in base_level. The two levels should be sequential--i.e. no
@@ -395,7 +446,6 @@ public:
if (base_level >= m_levels.size()) {
m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(base_level, shard_capacity)));
-
m_current_state.push_back({0, calc_level_record_capacity(base_level),
0, shard_capacity});
}
@@ -418,7 +468,7 @@ public:
m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
/*
- * Update the state vector to match the *real* stage following
+ * Update the state vector to match the *real* state following
* the reconstruction
*/
m_current_state[base_level] = {m_levels[base_level]->get_record_count(),
@@ -576,9 +626,11 @@ private:
return false;
}
- if (L == LayoutPolicy::LEVELING) {
+ if constexpr (L == LayoutPolicy::LEVELING) {
return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap;
- } else {
+ } else if constexpr (L == LayoutPolicy::BSM) {
+ return state[idx].reccnt == 0;
+ } else {
return state[idx].shardcnt < state[idx].shardcap;
}
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index db38946..b962dcc 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -64,6 +64,21 @@ public:
return std::shared_ptr<InternalLevel>(res);
}
+ static std::shared_ptr<InternalLevel> reconstruction(std::vector<InternalLevel*> levels, size_t level_idx) {
+ std::vector<Shard *> shards;
+ for (auto level : levels) {
+ for (auto shard : level->m_shards) {
+ if (shard) shards.emplace_back(shard.get());
+ }
+ }
+
+ auto res = new InternalLevel(level_idx, 1);
+ res->m_shard_cnt = 1;
+ res->m_shards[0] = std::make_shared<S>(shards);
+
+ return std::shared_ptr<InternalLevel>(res);
+ }
+
/*
* Create a new shard combining the records from all of
* the shards in level, and append this new shard into
@@ -185,6 +200,10 @@ public:
}
Shard* get_shard(size_t idx) {
+ if (idx >= m_shard_cnt) {
+ return nullptr;
+ }
+
return m_shards[idx].get();
}
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index 55cc682..4a4524a 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -35,7 +35,8 @@ static thread_local size_t sampling_bailouts = 0;
enum class LayoutPolicy {
LEVELING,
- TEIRING
+ TEIRING,
+ BSM
};
enum class DeletePolicy {