summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/framework/structure/ExtensionStructure.h144
1 files changed, 89 insertions, 55 deletions
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 4802bc1..823a66b 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -27,6 +27,16 @@ class ExtensionStructure {
typedef S Shard;
typedef BufferView<R> BuffView;
+ typedef struct {
+ size_t reccnt;
+ size_t reccap;
+
+ size_t shardcnt;
+ size_t shardcap;
+ } level_state;
+
+ typedef std::vector<level_state> state_vector;
+
public:
ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop)
: m_scale_factor(scale_factor)
@@ -59,6 +69,7 @@ public:
}
new_struct->m_refcnt = 0;
+ new_struct->m_current_state = m_current_state;
return new_struct;
}
@@ -95,8 +106,8 @@ public:
* takes a structure as input.
*/
inline bool flush_buffer(BuffView buffer) {
- assert(can_reconstruct_with(0, buffer.get_record_count()));
-
+ state_vector tmp = m_current_state;
+ assert(can_reconstruct_with(0, buffer.get_record_count(), tmp));
flush_buffer_into_l0(std::move(buffer));
return true;
@@ -200,8 +211,16 @@ public:
return m_levels;
}
+ /*
+ * NOTE: This cannot be simulated, because tombstone cancellation is not
+ * cheaply predictable. It is possible that the worst case number could
+ * be used instead, to allow for prediction, but compaction isn't a
+ * major concern outside of sampling; at least for now. So I'm not
+ * going to focus too much time on it at the moment.
+ */
std::vector<ReconstructionTask> get_compaction_tasks() {
std::vector<ReconstructionTask> tasks;
+ state_vector scratch_state = m_current_state;
/* if the tombstone/delete invariant is satisfied, no need for compactions */
if (validate_tombstone_proportion()) {
@@ -219,9 +238,9 @@ public:
assert(violation_idx != -1);
- level_index base_level = find_reconstruction_target(violation_idx);
+ level_index base_level = find_reconstruction_target(violation_idx, scratch_state);
if (base_level == -1) {
- base_level = grow();
+ base_level = grow(scratch_state);
}
for (level_index i=base_level; i>0; i--) {
@@ -239,7 +258,7 @@ public:
*/
size_t reccnt = m_levels[i - 1]->get_record_count();
if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, reccnt)) {
+ if (can_reconstruct_with(i, reccnt, scratch_state)) {
reccnt += m_levels[i]->get_record_count();
}
}
@@ -254,46 +273,28 @@ public:
/*
*
*/
- std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt) {
- std::vector<ReconstructionTask> reconstructions;
+ std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt,
+ state_vector scratch_state={}) {
+ /*
+ * If no scratch state vector is provided, use a copy of the
+ * current one. The only time an empty vector could be used as
+ * *real* input to this function is when the current state is also
+ * empty, so this should would even in that case.
+ */
+ if (scratch_state.size() == 0) {
+ scratch_state = m_current_state;
+ }
/*
* The buffer flush is not included so if that can be done without any
* other change, just return an empty list.
*/
- if (can_reconstruct_with(0, buffer_reccnt)) {
+ std::vector<ReconstructionTask> reconstructions;
+ if (can_reconstruct_with(0, buffer_reccnt, scratch_state)) {
return std::move(reconstructions);
}
- level_index base_level = find_reconstruction_target(0);
- if (base_level == -1) {
- base_level = grow();
- }
-
- for (level_index i=base_level; i>0; i--) {
- ReconstructionTask task = {i-1, i};
-
- /*
- * The amount of storage required for the reconstruction accounts
- * for the cost of storing the new records, along with the
- * cost of retaining the old records during the process
- * (hence the 2x multiplier).
- *
- * FIXME: currently does not account for the *actual* size
- * of the shards, only the storage for the records
- * themselves.
- */
- size_t reccnt = m_levels[i-1]->get_record_count();
- if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, reccnt)) {
- reccnt += m_levels[i]->get_record_count();
- }
- }
- //task.m_size = 2* reccnt * sizeof(R);
-
- reconstructions.push_back(task);
- }
-
+ reconstructions = get_reconstruction_tasks_from_level(0, scratch_state);
return std::move(reconstructions);
}
@@ -301,12 +302,12 @@ public:
/*
*
*/
- std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level) {
+ std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) {
std::vector<ReconstructionTask> reconstructions;
- level_index base_level = find_reconstruction_target(source_level);
+ level_index base_level = find_reconstruction_target(source_level, scratch_state);
if (base_level == -1) {
- base_level = grow();
+ base_level = grow(scratch_state);
}
for (level_index i=base_level; i>source_level; i--) {
@@ -323,7 +324,7 @@ public:
*/
size_t reccnt = m_levels[i-1]->get_record_count();
if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, reccnt)) {
+ if (can_reconstruct_with(i, reccnt, scratch_state)) {
reccnt += m_levels[i]->get_record_count();
}
}
@@ -332,7 +333,7 @@ public:
reconstructions.push_back(task);
}
- return reconstructions;
+ return std::move(reconstructions);
}
/*
@@ -342,7 +343,10 @@ public:
* tombstone ordering invariant may be violated.
*/
inline void reconstruction(level_index base_level, level_index incoming_level) {
+
+ size_t shard_capacity;
if constexpr (L == LayoutPolicy::LEVELING) {
+ shard_capacity = 1;
/* if the base level has a shard, merge the base and incoming together to make a new one */
if (m_levels[base_level]->get_shard_count() > 0) {
m_levels[base_level] = InternalLevel<R, Shard, Q>::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get());
@@ -350,13 +354,23 @@ public:
} else {
m_levels[base_level] = m_levels[incoming_level];
}
+
} else {
+ shard_capacity = m_scale_factor;
m_levels[base_level]->append_level(m_levels[incoming_level].get());
m_levels[base_level]->finalize();
}
/* place a new, empty level where the incoming level used to be */
m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
+
+ /*
+ * Update the state vector to match the *real* stage following
+ * the reconstruction
+ */
+ m_current_state[base_level] = {m_levels[base_level]->get_record_count(),
+ calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity};
+ m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity};
}
bool take_reference() {
@@ -393,14 +407,26 @@ private:
std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels;
+ /*
+ * A pair of <record_count, shard_count> for each level in the
+ * structure. Record counts may be slightly inaccurate due to
+ * deletes.
+ */
+ state_vector m_current_state;
+
/*
* Add a new level to the structure and return its index.
*/
- inline level_index grow() {
+ inline level_index grow(state_vector &scratch_state) {
level_index new_idx = m_levels.size();
- size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+ size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+
+ m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cap)));
- m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)));
+ m_current_state.push_back({0, calc_level_record_capacity(new_idx),
+ 0, new_shard_cap});
+ scratch_state.push_back({0, calc_level_record_capacity(new_idx),
+ 0, new_shard_cap});
return new_idx;
}
@@ -411,17 +437,21 @@ private:
* returns -1 if idx==0, and no such level exists, to simplify
* the logic of the first buffer flush.
*/
- inline level_index find_reconstruction_target(level_index idx) {
+ inline level_index find_reconstruction_target(level_index idx, state_vector &state) {
- if (idx == 0 && m_levels.size() == 0) return -1;
+ /*
+ * this handles the very first buffer flush, when the state vector
+ * is empty.
+ */
+ if (idx == 0 && state.size() == 0) return -1;
- size_t incoming_rec_cnt = get_level_record_count(idx);
- for (level_index i=idx+1; i<m_levels.size(); i++) {
- if (can_reconstruct_with(i, incoming_rec_cnt)) {
+ size_t incoming_rec_cnt = state[idx].reccnt;
+ for (level_index i=idx+1; i<state.size(); i++) {
+ if (can_reconstruct_with(i, incoming_rec_cnt, state)) {
return i;
}
- incoming_rec_cnt = get_level_record_count(i);
+ incoming_rec_cnt = state[idx].reccnt;
}
return -1;
@@ -444,6 +474,10 @@ private:
} else {
m_levels[0]->append_buffer(std::move(buffer));
}
+
+ /* update the state vector */
+ m_current_state[0].reccnt = m_levels[0]->get_record_count();
+ m_current_state[0].shardcnt = m_levels[0]->get_shard_count();
}
/*
@@ -475,15 +509,15 @@ private:
* Determines if a level can sustain a reconstruction with incoming_rec_cnt
* additional records without exceeding its capacity.
*/
- inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) {
- if (idx >= m_levels.size() || !m_levels[idx]) {
+ inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) {
+ if (idx >= state.size()) {
return false;
}
if (L == LayoutPolicy::LEVELING) {
- return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx);
+ return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap;
} else {
- return m_levels[idx]->get_shard_count() < m_scale_factor;
+ return state[idx].shardcnt < state[idx].shardcap;
}
/* unreachable */