diff options
| author | Douglas B. Rumbaugh <dbr4@psu.edu> | 2024-05-14 16:31:05 -0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-05-14 16:31:05 -0400 |
| commit | 47916da2ba5ed5bee2dda3cbcc58d39e1e931bfc (patch) | |
| tree | ee5613ce182b2c9caa228d3abeb65dc27fef2db3 /include/framework | |
| parent | 4a834497d5f82c817d634925250158d85ca825c2 (diff) | |
| parent | 8643fe194dec05b4e3f3ea31e162ac0b2b00e162 (diff) | |
| download | dynamic-extension-47916da2ba5ed5bee2dda3cbcc58d39e1e931bfc.tar.gz | |
Merge pull request #4 from dbrumbaugh/master
Updates for VLDB revision
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 71 | ||||
| -rw-r--r-- | include/framework/interface/Query.h | 6 | ||||
| -rw-r--r-- | include/framework/interface/Record.h | 49 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 2 | ||||
| -rw-r--r-- | include/framework/structure/BufferView.h | 10 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 302 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 19 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 13 | ||||
| -rw-r--r-- | include/framework/util/Configuration.h | 6 |
9 files changed, 360 insertions, 118 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 7ea5370..e2e2784 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -54,6 +54,10 @@ public: , m_next_core(0) , m_epoch_cnt(0) { + if constexpr (L == LayoutPolicy::BSM) { + assert(scale_factor == 2); + } + auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); m_previous_epoch.store({nullptr, 0}); @@ -201,7 +205,7 @@ public: */ size_t get_memory_usage() { auto epoch = get_active_epoch(); - auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage(); + auto t = m_buffer->get_memory_usage() + epoch->get_structure()->get_memory_usage(); end_job(epoch); return t; @@ -214,7 +218,7 @@ public: */ size_t get_aux_memory_usage() { auto epoch = get_active_epoch(); - auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); + auto t = epoch->get_structure()->get_aux_memory_usage(); end_job(epoch); return t; @@ -487,10 +491,17 @@ private: ((DynamicExtension *) args->extension)->SetThreadAffinity(); Structure *vers = args->epoch->get_structure(); - for (ssize_t i=0; i<args->merges.size(); i++) { - vers->reconstruction(args->merges[i].second, args->merges[i].first); + if constexpr (L == LayoutPolicy::BSM) { + if (args->merges.size() > 0) { + vers->reconstruction(args->merges[0]); + } + } else { + for (ssize_t i=0; i<args->merges.size(); i++) { + vers->reconstruction(args->merges[i].target, args->merges[i].sources[0]); + } } + /* * we'll grab the buffer AFTER doing the internal reconstruction, so we * can flush as many records as possible in one go. The reconstruction @@ -546,30 +557,34 @@ private: std::vector<std::pair<ShardID, Shard*>> shards; std::vector<void *> states = vers->get_query_states(shards, parms); + std::vector<R> results; Q::process_query_states(parms, states, buffer_state); - std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); - for (size_t i=0; i<query_results.size(); i++) { - std::vector<Wrapped<R>> local_results; - ShardID shid; - - if (i == 0) { /* process the buffer first */ - local_results = Q::buffer_query(buffer_state, parms); - shid = INVALID_SHID; - } else { - local_results = Q::query(shards[i - 1].second, states[i - 1], parms); - shid = shards[i - 1].first; - } + do { + std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); + for (size_t i=0; i<query_results.size(); i++) { + std::vector<Wrapped<R>> local_results; + ShardID shid; + + if (i == 0) { /* process the buffer first */ + local_results = Q::buffer_query(buffer_state, parms); + shid = INVALID_SHID; + } else { + local_results = Q::query(shards[i - 1].second, states[i - 1], parms); + shid = shards[i - 1].first; + } - query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); + query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); - if constexpr (Q::EARLY_ABORT) { - if (query_results[i].size() > 0) break; + if constexpr (Q::EARLY_ABORT) { + if (query_results[i].size() > 0) break; + } } - } + Q::merge(query_results, parms, results); + + } while (Q::repeat(parms, results, states, buffer_state)); - auto result = Q::merge(query_results, parms); - args->result_set.set_value(std::move(result)); + args->result_set.set_value(std::move(results)); ((DynamicExtension *) args->extension)->end_job(epoch); @@ -624,7 +639,7 @@ private: static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) { if constexpr (Q::SKIP_DELETE_FILTER) { - return records; + return std::move(records); } std::vector<Wrapped<R>> processed_records; @@ -685,7 +700,12 @@ private: return processed_records; } +#ifdef _GNU_SOURCE void SetThreadAffinity() { + if constexpr (std::same_as<SCHED, SerialScheduler>) { + return; + } + int core = m_next_core.fetch_add(1) % m_core_cnt; cpu_set_t mask; CPU_ZERO(&mask); @@ -707,6 +727,11 @@ private: CPU_SET(core, &mask); ::sched_setaffinity(0, sizeof(mask), &mask); } +#else + void SetThreadAffinity() { + + } +#endif void end_job(_Epoch *epoch) { diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h index 3d487f0..577d6cd 100644 --- a/include/framework/interface/Query.h +++ b/include/framework/interface/Query.h @@ -13,17 +13,19 @@ namespace de{ template <typename Q, typename R, typename S> -concept QueryInterface = requires(void *p, S *sh, std::vector<void*> &s, std::vector<std::vector<Wrapped<R>>> &rv, BufferView<R> *bv) { +concept QueryInterface = requires(void *p, S *sh, std::vector<void*> &s, std::vector<std::vector<Wrapped<R>>> &rv, BufferView<R> *bv, std::vector<R> &resv) { {Q::get_query_state(sh, p)} -> std::convertible_to<void*>; {Q::get_buffer_query_state(bv, p)} -> std::convertible_to<void *>; {Q::process_query_states(p, s, p)}; {Q::query(sh, p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>; {Q::buffer_query(p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>; - {Q::merge(rv, p)} -> std::convertible_to<std::vector<R>>; + {Q::merge(rv, p, resv)}; {Q::delete_query_state(p)} -> std::same_as<void>; {Q::delete_buffer_query_state(p)} -> std::same_as<void>; + {Q::repeat(p, resv, s, p)} -> std::same_as<bool>; + {Q::EARLY_ABORT} -> std::convertible_to<bool>; {Q::SKIP_DELETE_FILTER} -> std::convertible_to<bool>; }; diff --git a/include/framework/interface/Record.h b/include/framework/interface/Record.h index 5b9f307..19ccadd 100644 --- a/include/framework/interface/Record.h +++ b/include/framework/interface/Record.h @@ -47,13 +47,18 @@ concept AlexInterface = KVPInterface<R> && requires(R r) { }; template<typename R> -concept WrappedInterface = RecordInterface<R> && requires(R r, R s, bool b) { +concept WrappedInterface = RecordInterface<R> && requires(R r, R s, bool b, int i) { {r.header} -> std::convertible_to<uint32_t>; r.rec; {r.set_delete()}; {r.is_deleted()} -> std::convertible_to<bool>; {r.set_tombstone(b)}; {r.is_tombstone()} -> std::convertible_to<bool>; + {r.set_timestamp(i)}; + {r.get_timestamp()} -> std::convertible_to<uint32_t>; + {r.clear_timestamp()}; + {r.is_visible()} -> std::convertible_to<bool>; + {r.set_visible()}; {r < s} -> std::convertible_to<bool>; {r == s} ->std::convertible_to<bool>; }; @@ -71,9 +76,29 @@ struct Wrapped { return header & 2; } + inline void set_visible() { + header |= 4; + } + + inline bool is_visible() const { + return header & 4; + } + + inline void set_timestamp(int ts) { + header |= (ts << 3); + } + + inline int get_timestamp() const { + return header >> 3; + } + + inline void clear_timestamp() { + header &= 7; + } + inline void set_tombstone(bool val=true) { if (val) { - header |= val; + header |= 1; } else { header &= 0; } @@ -97,9 +122,8 @@ template <typename K, typename V> struct Record { K key; V value; - uint32_t header = 0; - inline bool operator<(const Record& other) const { + inline bool operator<(const Record& other) const { return key < other.key || (key == other.key && value < other.value); } @@ -108,6 +132,23 @@ struct Record { } }; +template<typename V> +struct Record<const char*, V> { + const char* key; + V value; + size_t len; + + inline bool operator<(const Record& other) const { + size_t n = std::min(len, other.len) + 1; + return strncmp(key, other.key, n) < 0; + } + + inline bool operator==(const Record& other) const { + size_t n = std::min(len, other.len) + 1; + return strncmp(key, other.key, n) == 0; + } +}; + template <typename K, typename V, typename W> struct WeightedRecord { K key; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index d5d4266..bd53090 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -26,7 +26,7 @@ namespace de { template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L> struct ReconstructionArgs { Epoch<R, S, Q, L> *epoch; - std::vector<ReconstructionTask> merges; + ReconstructionVector merges; std::promise<bool> result; bool compaction; void *extension; diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 9e0872b..e95a799 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -20,7 +20,7 @@ namespace de { -typedef std::_Bind<void (*(void*, long unsigned int))(void*, long unsigned int)> ReleaseFunction; +typedef std::function<void(void)> ReleaseFunction; template <RecordInterface R> class BufferView { @@ -112,6 +112,10 @@ public: size_t get_record_count() { return m_tail - m_head; } + + size_t get_capacity() { + return m_cap; + } /* * NOTE: This function returns an upper bound on the number @@ -123,7 +127,7 @@ public: } Wrapped<R> *get(size_t i) { - assert(i < get_record_count()); + //assert(i < get_record_count()); return m_data + to_idx(i); } @@ -160,7 +164,7 @@ private: bool m_active; size_t to_idx(size_t i) { - size_t idx = (m_start + i >= m_cap) ? i = (m_cap - m_start) + size_t idx = (m_start + i >= m_cap) ? i - (m_cap - m_start) : m_start + i; assert(idx < m_cap); return idx; diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 4802bc1..b83674b 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -27,6 +27,16 @@ class ExtensionStructure { typedef S Shard; typedef BufferView<R> BuffView; + typedef struct { + size_t reccnt; + size_t reccap; + + size_t shardcnt; + size_t shardcap; + } level_state; + + typedef std::vector<level_state> state_vector; + public: ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) : m_scale_factor(scale_factor) @@ -59,6 +69,7 @@ public: } new_struct->m_refcnt = 0; + new_struct->m_current_state = m_current_state; return new_struct; } @@ -95,8 +106,13 @@ public: * takes a structure as input. */ inline bool flush_buffer(BuffView buffer) { - assert(can_reconstruct_with(0, buffer.get_record_count())); + state_vector tmp = m_current_state; + + if (tmp.size() == 0) { + grow(tmp); + } + assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); flush_buffer_into_l0(std::move(buffer)); return true; @@ -200,8 +216,16 @@ public: return m_levels; } - std::vector<ReconstructionTask> get_compaction_tasks() { - std::vector<ReconstructionTask> tasks; + /* + * NOTE: This cannot be simulated, because tombstone cancellation is not + * cheaply predictable. It is possible that the worst case number could + * be used instead, to allow for prediction, but compaction isn't a + * major concern outside of sampling; at least for now. So I'm not + * going to focus too much time on it at the moment. + */ + ReconstructionVector get_compaction_tasks() { + ReconstructionVector tasks; + state_vector scratch_state = m_current_state; /* if the tombstone/delete invariant is satisfied, no need for compactions */ if (validate_tombstone_proportion()) { @@ -219,14 +243,12 @@ public: assert(violation_idx != -1); - level_index base_level = find_reconstruction_target(violation_idx); + level_index base_level = find_reconstruction_target(violation_idx, scratch_state); if (base_level == -1) { - base_level = grow(); + base_level = grow(scratch_state); } for (level_index i=base_level; i>0; i--) { - ReconstructionTask task = {i-1, i}; - /* * The amount of storage required for the reconstruction accounts * for the cost of storing the new records, along with the @@ -239,13 +261,11 @@ public: */ size_t reccnt = m_levels[i - 1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt, scratch_state)) { reccnt += m_levels[i]->get_record_count(); } } - //task.m_size = 2* reccnt * sizeof(R); - - tasks.push_back(task); + tasks.add_reconstruction(i-i, i, reccnt); } return tasks; @@ -254,44 +274,53 @@ public: /* * */ - std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt) { - std::vector<ReconstructionTask> reconstructions; - - /* - * The buffer flush is not included so if that can be done without any - * other change, just return an empty list. + ReconstructionVector get_reconstruction_tasks(size_t buffer_reccnt, + state_vector scratch_state={}) { + /* + * If no scratch state vector is provided, use a copy of the + * current one. The only time an empty vector could be used as + * *real* input to this function is when the current state is also + * empty, so this should would even in that case. */ - if (can_reconstruct_with(0, buffer_reccnt)) { - return std::move(reconstructions); + if (scratch_state.size() == 0) { + scratch_state = m_current_state; } - level_index base_level = find_reconstruction_target(0); - if (base_level == -1) { - base_level = grow(); - } - - for (level_index i=base_level; i>0; i--) { - ReconstructionTask task = {i-1, i}; - + ReconstructionVector reconstructions; + size_t LOOKAHEAD = 1; + for (size_t i=0; i<LOOKAHEAD; i++) { /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. + * If L0 cannot support a direct buffer flush, figure out what + * work must be done to free up space first. Otherwise, the + * reconstruction vector will be initially empty. */ - size_t reccnt = m_levels[i-1]->get_record_count(); - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { - reccnt += m_levels[i]->get_record_count(); + if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) { + auto local_recon = get_reconstruction_tasks_from_level(0, scratch_state); + + /* + * for the first iteration, we need to do all of the + * reconstructions, so use these to initially the returned + * reconstruction list + */ + if (i == 0) { + reconstructions = local_recon; + /* + * Quick sanity test of idea: if the next reconstruction + * would be larger than this one, steal the largest + * task from it and run it now instead. + */ + } else if (local_recon.get_total_reccnt() > reconstructions.get_total_reccnt()) { + auto t = local_recon.remove_reconstruction(0); + reconstructions.add_reconstruction(t); } } - //task.m_size = 2* reccnt * sizeof(R); - reconstructions.push_back(task); + /* simulate the buffer flush in the scratch state */ + scratch_state[0].reccnt += buffer_reccnt; + if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) { + scratch_state[0].shardcnt += 1; + } + } return std::move(reconstructions); @@ -301,38 +330,109 @@ public: /* * */ - std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level) { - std::vector<ReconstructionTask> reconstructions; + ReconstructionVector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) { + ReconstructionVector reconstructions; - level_index base_level = find_reconstruction_target(source_level); + /* + * Find the first level capable of sustaining a reconstruction from + * the level above it. If no such level exists, add a new one at + * the bottom of the structure. + */ + level_index base_level = find_reconstruction_target(source_level, scratch_state); if (base_level == -1) { - base_level = grow(); + base_level = grow(scratch_state); + } + + if constexpr (L == LayoutPolicy::BSM) { + if (base_level == 0) { + return std::move(reconstructions); + } + + ReconstructionTask task; + task.target = base_level; + + size_t base_reccnt = 0; + for (level_index i=base_level; i>source_level; i--) { + auto recon_reccnt = scratch_state[i-1].reccnt; + base_reccnt += recon_reccnt; + scratch_state[i-1].reccnt = 0; + scratch_state[i-1].shardcnt = 0; + task.add_source(i-1, recon_reccnt); + } + + reconstructions.add_reconstruction(task); + scratch_state[base_level].reccnt = base_reccnt; + scratch_state[base_level].shardcnt = 1; + + return std::move(reconstructions); } + /* + * Determine the full set of reconstructions necessary to open up + * space in the source level. + */ for (level_index i=base_level; i>source_level; i--) { - ReconstructionTask task = {i - 1, i}; + size_t recon_reccnt = scratch_state[i-1].reccnt; + size_t base_reccnt = recon_reccnt; + /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. + * If using Leveling, the total reconstruction size will be the + * records in *both* base and target, because they will need to + * be merged (assuming that target isn't empty). */ - size_t reccnt = m_levels[i-1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { - reccnt += m_levels[i]->get_record_count(); + if (can_reconstruct_with(i, base_reccnt, scratch_state)) { + recon_reccnt += scratch_state[i].reccnt; } } -// task.m_size = 2* reccnt * sizeof(R); + reconstructions.add_reconstruction(i-1, i, recon_reccnt); + + /* + * The base level will be emptied and its records moved to + * the target. + */ + scratch_state[i-1].reccnt = 0; + scratch_state[i-1].shardcnt = 0; - reconstructions.push_back(task); + /* + * The target level will have the records from the base level + * added to it, and potentially gain a shard if the LayoutPolicy + * is tiering or the level currently lacks any shards at all. + */ + scratch_state[i].reccnt += base_reccnt; + if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) { + scratch_state[i].shardcnt += 1; + } } - return reconstructions; + return std::move(reconstructions); + } + + inline void reconstruction(ReconstructionTask task) { + static_assert(L == LayoutPolicy::BSM); + std::vector<InternalLevel<R, Shard, Q>*> levels(task.sources.size()); + for (size_t i=0; i<task.sources.size(); i++) { + levels[i] = m_levels[task.sources[i]].get(); + } + + auto new_level = InternalLevel<R, Shard, Q>::reconstruction(levels, task.target); + if (task.target >= m_levels.size()) { + m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target), + 1, 1}); + m_levels.emplace_back(new_level); + } else { + m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target), + 1, 1}; + m_levels[task.target] = new_level; + } + + /* remove all of the levels that have been flattened */ + for (size_t i=0; i<task.sources.size(); i++) { + m_levels[task.sources[i]] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(task.sources[i], 1)); + m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1}; + } + + return; } /* @@ -342,6 +442,14 @@ public: * tombstone ordering invariant may be violated. */ inline void reconstruction(level_index base_level, level_index incoming_level) { + size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + if (base_level >= m_levels.size()) { + m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(base_level, shard_capacity))); + m_current_state.push_back({0, calc_level_record_capacity(base_level), + 0, shard_capacity}); + } + if constexpr (L == LayoutPolicy::LEVELING) { /* if the base level has a shard, merge the base and incoming together to make a new one */ if (m_levels[base_level]->get_shard_count() > 0) { @@ -350,6 +458,7 @@ public: } else { m_levels[base_level] = m_levels[incoming_level]; } + } else { m_levels[base_level]->append_level(m_levels[incoming_level].get()); m_levels[base_level]->finalize(); @@ -357,6 +466,14 @@ public: /* place a new, empty level where the incoming level used to be */ m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + + /* + * Update the state vector to match the *real* state following + * the reconstruction + */ + m_current_state[base_level] = {m_levels[base_level]->get_record_count(), + calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity}; + m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; } bool take_reference() { @@ -393,14 +510,27 @@ private: std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels; + /* + * A pair of <record_count, shard_count> for each level in the + * structure. Record counts may be slightly inaccurate due to + * deletes. + */ + state_vector m_current_state; + /* - * Add a new level to the structure and return its index. + * Add a new level to the scratch state and return its index. + * + * IMPORTANT: This does _not_ add a level to the extension structure + * anymore. This is handled by the appropriate reconstruction and flush + * methods as needed. This function is for use in "simulated" + * reconstructions. */ - inline level_index grow() { + inline level_index grow(state_vector &scratch_state) { level_index new_idx = m_levels.size(); - size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt))); + scratch_state.push_back({0, calc_level_record_capacity(new_idx), + 0, new_shard_cap}); return new_idx; } @@ -411,24 +541,36 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first buffer flush. */ - inline level_index find_reconstruction_target(level_index idx) { + inline level_index find_reconstruction_target(level_index idx, state_vector &state) { - if (idx == 0 && m_levels.size() == 0) return -1; + /* + * this handles the very first buffer flush, when the state vector + * is empty. + */ + if (idx == 0 && state.size() == 0) return -1; - size_t incoming_rec_cnt = get_level_record_count(idx); - for (level_index i=idx+1; i<m_levels.size(); i++) { - if (can_reconstruct_with(i, incoming_rec_cnt)) { + size_t incoming_rec_cnt = state[idx].reccnt; + for (level_index i=idx+1; i<state.size(); i++) { + if (can_reconstruct_with(i, incoming_rec_cnt, state)) { return i; } - incoming_rec_cnt = get_level_record_count(i); + incoming_rec_cnt = state[idx].reccnt; } return -1; } inline void flush_buffer_into_l0(BuffView buffer) { - assert(m_levels[0]); + size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + if (m_levels.size() == 0) { + m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(0, shard_capacity))); + + m_current_state.push_back({0, calc_level_record_capacity(0), + 0, shard_capacity}); + } + if constexpr (L == LayoutPolicy::LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0].get(); @@ -444,6 +586,10 @@ private: } else { m_levels[0]->append_buffer(std::move(buffer)); } + + /* update the state vector */ + m_current_state[0].reccnt = m_levels[0]->get_record_count(); + m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); } /* @@ -475,15 +621,17 @@ private: * Determines if a level can sustain a reconstruction with incoming_rec_cnt * additional records without exceeding its capacity. */ - inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) { - if (idx >= m_levels.size() || !m_levels[idx]) { + inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) { + if (idx >= state.size()) { return false; } - if (L == LayoutPolicy::LEVELING) { - return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); - } else { - return m_levels[idx]->get_shard_count() < m_scale_factor; + if constexpr (L == LayoutPolicy::LEVELING) { + return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; + } else if constexpr (L == LayoutPolicy::BSM) { + return state[idx].reccnt == 0; + } else { + return state[idx].shardcnt < state[idx].shardcap; } /* unreachable */ diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index db38946..b962dcc 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -64,6 +64,21 @@ public: return std::shared_ptr<InternalLevel>(res); } + static std::shared_ptr<InternalLevel> reconstruction(std::vector<InternalLevel*> levels, size_t level_idx) { + std::vector<Shard *> shards; + for (auto level : levels) { + for (auto shard : level->m_shards) { + if (shard) shards.emplace_back(shard.get()); + } + } + + auto res = new InternalLevel(level_idx, 1); + res->m_shard_cnt = 1; + res->m_shards[0] = std::make_shared<S>(shards); + + return std::shared_ptr<InternalLevel>(res); + } + /* * Create a new shard combining the records from all of * the shards in level, and append this new shard into @@ -185,6 +200,10 @@ public: } Shard* get_shard(size_t idx) { + if (idx >= m_shard_cnt) { + return nullptr; + } + return m_shards[idx].get(); } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 415c95a..7db3980 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -50,18 +50,19 @@ public: , m_tail(0) , m_head({0, 0}) , m_old_head({high_watermark, 0}) - , m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>))) + //, m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>))) + , m_data(new Wrapped<R>[m_cap]()) , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS)) , m_tscnt(0) , m_old_tscnt(0) , m_active_head_advance(false) { assert(m_cap > m_hwm); - assert(m_hwm > m_lwm); + assert(m_hwm >= m_lwm); } ~MutableBuffer() { - free(m_data); + delete[] m_data; delete m_tombstone_filter; } @@ -76,16 +77,20 @@ public: wrec.header = 0; if (tombstone) wrec.set_tombstone(); + // FIXME: because of the mod, it isn't correct to use `pos` + // as the ordering timestamp in the header anymore. size_t pos = tail % m_cap; m_data[pos] = wrec; - m_data[pos].header |= (pos << 2); + m_data[pos].set_timestamp(pos); if (tombstone) { m_tscnt.fetch_add(1); if (m_tombstone_filter) m_tombstone_filter->insert(rec); } + m_data[pos].set_visible(); + return 1; } diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 65ca181..4a4524a 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -35,7 +35,8 @@ static thread_local size_t sampling_bailouts = 0; enum class LayoutPolicy { LEVELING, - TEIRING + TEIRING, + BSM }; enum class DeletePolicy { @@ -43,7 +44,4 @@ enum class DeletePolicy { TAGGING }; -typedef ssize_t level_index; -typedef std::pair<level_index, level_index> ReconstructionTask; - } |