summaryrefslogtreecommitdiffstats
path: root/include/framework/structure
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/structure')
-rw-r--r--include/framework/structure/BufferView.h10
-rw-r--r--include/framework/structure/ExtensionStructure.h302
-rw-r--r--include/framework/structure/InternalLevel.h19
-rw-r--r--include/framework/structure/MutableBuffer.h13
4 files changed, 260 insertions, 84 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index 9e0872b..e95a799 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -20,7 +20,7 @@
namespace de {
-typedef std::_Bind<void (*(void*, long unsigned int))(void*, long unsigned int)> ReleaseFunction;
+typedef std::function<void(void)> ReleaseFunction;
template <RecordInterface R>
class BufferView {
@@ -112,6 +112,10 @@ public:
size_t get_record_count() {
return m_tail - m_head;
}
+
+ size_t get_capacity() {
+ return m_cap;
+ }
/*
* NOTE: This function returns an upper bound on the number
@@ -123,7 +127,7 @@ public:
}
Wrapped<R> *get(size_t i) {
- assert(i < get_record_count());
+ //assert(i < get_record_count());
return m_data + to_idx(i);
}
@@ -160,7 +164,7 @@ private:
bool m_active;
size_t to_idx(size_t i) {
- size_t idx = (m_start + i >= m_cap) ? i = (m_cap - m_start)
+ size_t idx = (m_start + i >= m_cap) ? i - (m_cap - m_start)
: m_start + i;
assert(idx < m_cap);
return idx;
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 4802bc1..b83674b 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -27,6 +27,16 @@ class ExtensionStructure {
typedef S Shard;
typedef BufferView<R> BuffView;
+ typedef struct {
+ size_t reccnt;
+ size_t reccap;
+
+ size_t shardcnt;
+ size_t shardcap;
+ } level_state;
+
+ typedef std::vector<level_state> state_vector;
+
public:
ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop)
: m_scale_factor(scale_factor)
@@ -59,6 +69,7 @@ public:
}
new_struct->m_refcnt = 0;
+ new_struct->m_current_state = m_current_state;
return new_struct;
}
@@ -95,8 +106,13 @@ public:
* takes a structure as input.
*/
inline bool flush_buffer(BuffView buffer) {
- assert(can_reconstruct_with(0, buffer.get_record_count()));
+ state_vector tmp = m_current_state;
+
+ if (tmp.size() == 0) {
+ grow(tmp);
+ }
+ assert(can_reconstruct_with(0, buffer.get_record_count(), tmp));
flush_buffer_into_l0(std::move(buffer));
return true;
@@ -200,8 +216,16 @@ public:
return m_levels;
}
- std::vector<ReconstructionTask> get_compaction_tasks() {
- std::vector<ReconstructionTask> tasks;
+ /*
+ * NOTE: This cannot be simulated, because tombstone cancellation is not
+ * cheaply predictable. It is possible that the worst case number could
+ * be used instead, to allow for prediction, but compaction isn't a
+ * major concern outside of sampling; at least for now. So I'm not
+ * going to focus too much time on it at the moment.
+ */
+ ReconstructionVector get_compaction_tasks() {
+ ReconstructionVector tasks;
+ state_vector scratch_state = m_current_state;
/* if the tombstone/delete invariant is satisfied, no need for compactions */
if (validate_tombstone_proportion()) {
@@ -219,14 +243,12 @@ public:
assert(violation_idx != -1);
- level_index base_level = find_reconstruction_target(violation_idx);
+ level_index base_level = find_reconstruction_target(violation_idx, scratch_state);
if (base_level == -1) {
- base_level = grow();
+ base_level = grow(scratch_state);
}
for (level_index i=base_level; i>0; i--) {
- ReconstructionTask task = {i-1, i};
-
/*
* The amount of storage required for the reconstruction accounts
* for the cost of storing the new records, along with the
@@ -239,13 +261,11 @@ public:
*/
size_t reccnt = m_levels[i - 1]->get_record_count();
if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, reccnt)) {
+ if (can_reconstruct_with(i, reccnt, scratch_state)) {
reccnt += m_levels[i]->get_record_count();
}
}
- //task.m_size = 2* reccnt * sizeof(R);
-
- tasks.push_back(task);
+ tasks.add_reconstruction(i-i, i, reccnt);
}
return tasks;
@@ -254,44 +274,53 @@ public:
/*
*
*/
- std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt) {
- std::vector<ReconstructionTask> reconstructions;
-
- /*
- * The buffer flush is not included so if that can be done without any
- * other change, just return an empty list.
+ ReconstructionVector get_reconstruction_tasks(size_t buffer_reccnt,
+ state_vector scratch_state={}) {
+ /*
+ * If no scratch state vector is provided, use a copy of the
+ * current one. The only time an empty vector could be used as
+ * *real* input to this function is when the current state is also
+ * empty, so this should would even in that case.
*/
- if (can_reconstruct_with(0, buffer_reccnt)) {
- return std::move(reconstructions);
+ if (scratch_state.size() == 0) {
+ scratch_state = m_current_state;
}
- level_index base_level = find_reconstruction_target(0);
- if (base_level == -1) {
- base_level = grow();
- }
-
- for (level_index i=base_level; i>0; i--) {
- ReconstructionTask task = {i-1, i};
-
+ ReconstructionVector reconstructions;
+ size_t LOOKAHEAD = 1;
+ for (size_t i=0; i<LOOKAHEAD; i++) {
/*
- * The amount of storage required for the reconstruction accounts
- * for the cost of storing the new records, along with the
- * cost of retaining the old records during the process
- * (hence the 2x multiplier).
- *
- * FIXME: currently does not account for the *actual* size
- * of the shards, only the storage for the records
- * themselves.
+ * If L0 cannot support a direct buffer flush, figure out what
+ * work must be done to free up space first. Otherwise, the
+ * reconstruction vector will be initially empty.
*/
- size_t reccnt = m_levels[i-1]->get_record_count();
- if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, reccnt)) {
- reccnt += m_levels[i]->get_record_count();
+ if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) {
+ auto local_recon = get_reconstruction_tasks_from_level(0, scratch_state);
+
+ /*
+ * for the first iteration, we need to do all of the
+ * reconstructions, so use these to initially the returned
+ * reconstruction list
+ */
+ if (i == 0) {
+ reconstructions = local_recon;
+ /*
+ * Quick sanity test of idea: if the next reconstruction
+ * would be larger than this one, steal the largest
+ * task from it and run it now instead.
+ */
+ } else if (local_recon.get_total_reccnt() > reconstructions.get_total_reccnt()) {
+ auto t = local_recon.remove_reconstruction(0);
+ reconstructions.add_reconstruction(t);
}
}
- //task.m_size = 2* reccnt * sizeof(R);
- reconstructions.push_back(task);
+ /* simulate the buffer flush in the scratch state */
+ scratch_state[0].reccnt += buffer_reccnt;
+ if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) {
+ scratch_state[0].shardcnt += 1;
+ }
+
}
return std::move(reconstructions);
@@ -301,38 +330,109 @@ public:
/*
*
*/
- std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level) {
- std::vector<ReconstructionTask> reconstructions;
+ ReconstructionVector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) {
+ ReconstructionVector reconstructions;
- level_index base_level = find_reconstruction_target(source_level);
+ /*
+ * Find the first level capable of sustaining a reconstruction from
+ * the level above it. If no such level exists, add a new one at
+ * the bottom of the structure.
+ */
+ level_index base_level = find_reconstruction_target(source_level, scratch_state);
if (base_level == -1) {
- base_level = grow();
+ base_level = grow(scratch_state);
+ }
+
+ if constexpr (L == LayoutPolicy::BSM) {
+ if (base_level == 0) {
+ return std::move(reconstructions);
+ }
+
+ ReconstructionTask task;
+ task.target = base_level;
+
+ size_t base_reccnt = 0;
+ for (level_index i=base_level; i>source_level; i--) {
+ auto recon_reccnt = scratch_state[i-1].reccnt;
+ base_reccnt += recon_reccnt;
+ scratch_state[i-1].reccnt = 0;
+ scratch_state[i-1].shardcnt = 0;
+ task.add_source(i-1, recon_reccnt);
+ }
+
+ reconstructions.add_reconstruction(task);
+ scratch_state[base_level].reccnt = base_reccnt;
+ scratch_state[base_level].shardcnt = 1;
+
+ return std::move(reconstructions);
}
+ /*
+ * Determine the full set of reconstructions necessary to open up
+ * space in the source level.
+ */
for (level_index i=base_level; i>source_level; i--) {
- ReconstructionTask task = {i - 1, i};
+ size_t recon_reccnt = scratch_state[i-1].reccnt;
+ size_t base_reccnt = recon_reccnt;
+
/*
- * The amount of storage required for the reconstruction accounts
- * for the cost of storing the new records, along with the
- * cost of retaining the old records during the process
- * (hence the 2x multiplier).
- *
- * FIXME: currently does not account for the *actual* size
- * of the shards, only the storage for the records
- * themselves.
+ * If using Leveling, the total reconstruction size will be the
+ * records in *both* base and target, because they will need to
+ * be merged (assuming that target isn't empty).
*/
- size_t reccnt = m_levels[i-1]->get_record_count();
if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, reccnt)) {
- reccnt += m_levels[i]->get_record_count();
+ if (can_reconstruct_with(i, base_reccnt, scratch_state)) {
+ recon_reccnt += scratch_state[i].reccnt;
}
}
-// task.m_size = 2* reccnt * sizeof(R);
+ reconstructions.add_reconstruction(i-1, i, recon_reccnt);
+
+ /*
+ * The base level will be emptied and its records moved to
+ * the target.
+ */
+ scratch_state[i-1].reccnt = 0;
+ scratch_state[i-1].shardcnt = 0;
- reconstructions.push_back(task);
+ /*
+ * The target level will have the records from the base level
+ * added to it, and potentially gain a shard if the LayoutPolicy
+ * is tiering or the level currently lacks any shards at all.
+ */
+ scratch_state[i].reccnt += base_reccnt;
+ if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) {
+ scratch_state[i].shardcnt += 1;
+ }
}
- return reconstructions;
+ return std::move(reconstructions);
+ }
+
+ inline void reconstruction(ReconstructionTask task) {
+ static_assert(L == LayoutPolicy::BSM);
+ std::vector<InternalLevel<R, Shard, Q>*> levels(task.sources.size());
+ for (size_t i=0; i<task.sources.size(); i++) {
+ levels[i] = m_levels[task.sources[i]].get();
+ }
+
+ auto new_level = InternalLevel<R, Shard, Q>::reconstruction(levels, task.target);
+ if (task.target >= m_levels.size()) {
+ m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target),
+ 1, 1});
+ m_levels.emplace_back(new_level);
+ } else {
+ m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target),
+ 1, 1};
+ m_levels[task.target] = new_level;
+ }
+
+ /* remove all of the levels that have been flattened */
+ for (size_t i=0; i<task.sources.size(); i++) {
+ m_levels[task.sources[i]] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(task.sources[i], 1));
+ m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1};
+ }
+
+ return;
}
/*
@@ -342,6 +442,14 @@ public:
* tombstone ordering invariant may be violated.
*/
inline void reconstruction(level_index base_level, level_index incoming_level) {
+ size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+
+ if (base_level >= m_levels.size()) {
+ m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(base_level, shard_capacity)));
+ m_current_state.push_back({0, calc_level_record_capacity(base_level),
+ 0, shard_capacity});
+ }
+
if constexpr (L == LayoutPolicy::LEVELING) {
/* if the base level has a shard, merge the base and incoming together to make a new one */
if (m_levels[base_level]->get_shard_count() > 0) {
@@ -350,6 +458,7 @@ public:
} else {
m_levels[base_level] = m_levels[incoming_level];
}
+
} else {
m_levels[base_level]->append_level(m_levels[incoming_level].get());
m_levels[base_level]->finalize();
@@ -357,6 +466,14 @@ public:
/* place a new, empty level where the incoming level used to be */
m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
+
+ /*
+ * Update the state vector to match the *real* state following
+ * the reconstruction
+ */
+ m_current_state[base_level] = {m_levels[base_level]->get_record_count(),
+ calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity};
+ m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity};
}
bool take_reference() {
@@ -393,14 +510,27 @@ private:
std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels;
+ /*
+ * A pair of <record_count, shard_count> for each level in the
+ * structure. Record counts may be slightly inaccurate due to
+ * deletes.
+ */
+ state_vector m_current_state;
+
/*
- * Add a new level to the structure and return its index.
+ * Add a new level to the scratch state and return its index.
+ *
+ * IMPORTANT: This does _not_ add a level to the extension structure
+ * anymore. This is handled by the appropriate reconstruction and flush
+ * methods as needed. This function is for use in "simulated"
+ * reconstructions.
*/
- inline level_index grow() {
+ inline level_index grow(state_vector &scratch_state) {
level_index new_idx = m_levels.size();
- size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+ size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
- m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)));
+ scratch_state.push_back({0, calc_level_record_capacity(new_idx),
+ 0, new_shard_cap});
return new_idx;
}
@@ -411,24 +541,36 @@ private:
* returns -1 if idx==0, and no such level exists, to simplify
* the logic of the first buffer flush.
*/
- inline level_index find_reconstruction_target(level_index idx) {
+ inline level_index find_reconstruction_target(level_index idx, state_vector &state) {
- if (idx == 0 && m_levels.size() == 0) return -1;
+ /*
+ * this handles the very first buffer flush, when the state vector
+ * is empty.
+ */
+ if (idx == 0 && state.size() == 0) return -1;
- size_t incoming_rec_cnt = get_level_record_count(idx);
- for (level_index i=idx+1; i<m_levels.size(); i++) {
- if (can_reconstruct_with(i, incoming_rec_cnt)) {
+ size_t incoming_rec_cnt = state[idx].reccnt;
+ for (level_index i=idx+1; i<state.size(); i++) {
+ if (can_reconstruct_with(i, incoming_rec_cnt, state)) {
return i;
}
- incoming_rec_cnt = get_level_record_count(i);
+ incoming_rec_cnt = state[idx].reccnt;
}
return -1;
}
inline void flush_buffer_into_l0(BuffView buffer) {
- assert(m_levels[0]);
+ size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+
+ if (m_levels.size() == 0) {
+ m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(0, shard_capacity)));
+
+ m_current_state.push_back({0, calc_level_record_capacity(0),
+ 0, shard_capacity});
+ }
+
if constexpr (L == LayoutPolicy::LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0].get();
@@ -444,6 +586,10 @@ private:
} else {
m_levels[0]->append_buffer(std::move(buffer));
}
+
+ /* update the state vector */
+ m_current_state[0].reccnt = m_levels[0]->get_record_count();
+ m_current_state[0].shardcnt = m_levels[0]->get_shard_count();
}
/*
@@ -475,15 +621,17 @@ private:
* Determines if a level can sustain a reconstruction with incoming_rec_cnt
* additional records without exceeding its capacity.
*/
- inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) {
- if (idx >= m_levels.size() || !m_levels[idx]) {
+ inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) {
+ if (idx >= state.size()) {
return false;
}
- if (L == LayoutPolicy::LEVELING) {
- return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx);
- } else {
- return m_levels[idx]->get_shard_count() < m_scale_factor;
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap;
+ } else if constexpr (L == LayoutPolicy::BSM) {
+ return state[idx].reccnt == 0;
+ } else {
+ return state[idx].shardcnt < state[idx].shardcap;
}
/* unreachable */
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index db38946..b962dcc 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -64,6 +64,21 @@ public:
return std::shared_ptr<InternalLevel>(res);
}
+ static std::shared_ptr<InternalLevel> reconstruction(std::vector<InternalLevel*> levels, size_t level_idx) {
+ std::vector<Shard *> shards;
+ for (auto level : levels) {
+ for (auto shard : level->m_shards) {
+ if (shard) shards.emplace_back(shard.get());
+ }
+ }
+
+ auto res = new InternalLevel(level_idx, 1);
+ res->m_shard_cnt = 1;
+ res->m_shards[0] = std::make_shared<S>(shards);
+
+ return std::shared_ptr<InternalLevel>(res);
+ }
+
/*
* Create a new shard combining the records from all of
* the shards in level, and append this new shard into
@@ -185,6 +200,10 @@ public:
}
Shard* get_shard(size_t idx) {
+ if (idx >= m_shard_cnt) {
+ return nullptr;
+ }
+
return m_shards[idx].get();
}
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 415c95a..7db3980 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -50,18 +50,19 @@ public:
, m_tail(0)
, m_head({0, 0})
, m_old_head({high_watermark, 0})
- , m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)))
+ //, m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)))
+ , m_data(new Wrapped<R>[m_cap]())
, m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS))
, m_tscnt(0)
, m_old_tscnt(0)
, m_active_head_advance(false)
{
assert(m_cap > m_hwm);
- assert(m_hwm > m_lwm);
+ assert(m_hwm >= m_lwm);
}
~MutableBuffer() {
- free(m_data);
+ delete[] m_data;
delete m_tombstone_filter;
}
@@ -76,16 +77,20 @@ public:
wrec.header = 0;
if (tombstone) wrec.set_tombstone();
+ // FIXME: because of the mod, it isn't correct to use `pos`
+ // as the ordering timestamp in the header anymore.
size_t pos = tail % m_cap;
m_data[pos] = wrec;
- m_data[pos].header |= (pos << 2);
+ m_data[pos].set_timestamp(pos);
if (tombstone) {
m_tscnt.fetch_add(1);
if (m_tombstone_filter) m_tombstone_filter->insert(rec);
}
+ m_data[pos].set_visible();
+
return 1;
}