diff options
| author | Douglas B. Rumbaugh <dbr4@psu.edu> | 2024-05-14 16:31:05 -0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-05-14 16:31:05 -0400 |
| commit | 47916da2ba5ed5bee2dda3cbcc58d39e1e931bfc (patch) | |
| tree | ee5613ce182b2c9caa228d3abeb65dc27fef2db3 /include | |
| parent | 4a834497d5f82c817d634925250158d85ca825c2 (diff) | |
| parent | 8643fe194dec05b4e3f3ea31e162ac0b2b00e162 (diff) | |
| download | dynamic-extension-47916da2ba5ed5bee2dda3cbcc58d39e1e931bfc.tar.gz | |
Merge pull request #4 from dbrumbaugh/master
Updates for VLDB revision
Diffstat (limited to 'include')
26 files changed, 1407 insertions, 267 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 7ea5370..e2e2784 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -54,6 +54,10 @@ public: , m_next_core(0) , m_epoch_cnt(0) { + if constexpr (L == LayoutPolicy::BSM) { + assert(scale_factor == 2); + } + auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); m_previous_epoch.store({nullptr, 0}); @@ -201,7 +205,7 @@ public: */ size_t get_memory_usage() { auto epoch = get_active_epoch(); - auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage(); + auto t = m_buffer->get_memory_usage() + epoch->get_structure()->get_memory_usage(); end_job(epoch); return t; @@ -214,7 +218,7 @@ public: */ size_t get_aux_memory_usage() { auto epoch = get_active_epoch(); - auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); + auto t = epoch->get_structure()->get_aux_memory_usage(); end_job(epoch); return t; @@ -487,10 +491,17 @@ private: ((DynamicExtension *) args->extension)->SetThreadAffinity(); Structure *vers = args->epoch->get_structure(); - for (ssize_t i=0; i<args->merges.size(); i++) { - vers->reconstruction(args->merges[i].second, args->merges[i].first); + if constexpr (L == LayoutPolicy::BSM) { + if (args->merges.size() > 0) { + vers->reconstruction(args->merges[0]); + } + } else { + for (ssize_t i=0; i<args->merges.size(); i++) { + vers->reconstruction(args->merges[i].target, args->merges[i].sources[0]); + } } + /* * we'll grab the buffer AFTER doing the internal reconstruction, so we * can flush as many records as possible in one go. The reconstruction @@ -546,30 +557,34 @@ private: std::vector<std::pair<ShardID, Shard*>> shards; std::vector<void *> states = vers->get_query_states(shards, parms); + std::vector<R> results; Q::process_query_states(parms, states, buffer_state); - std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); - for (size_t i=0; i<query_results.size(); i++) { - std::vector<Wrapped<R>> local_results; - ShardID shid; - - if (i == 0) { /* process the buffer first */ - local_results = Q::buffer_query(buffer_state, parms); - shid = INVALID_SHID; - } else { - local_results = Q::query(shards[i - 1].second, states[i - 1], parms); - shid = shards[i - 1].first; - } + do { + std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); + for (size_t i=0; i<query_results.size(); i++) { + std::vector<Wrapped<R>> local_results; + ShardID shid; + + if (i == 0) { /* process the buffer first */ + local_results = Q::buffer_query(buffer_state, parms); + shid = INVALID_SHID; + } else { + local_results = Q::query(shards[i - 1].second, states[i - 1], parms); + shid = shards[i - 1].first; + } - query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); + query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); - if constexpr (Q::EARLY_ABORT) { - if (query_results[i].size() > 0) break; + if constexpr (Q::EARLY_ABORT) { + if (query_results[i].size() > 0) break; + } } - } + Q::merge(query_results, parms, results); + + } while (Q::repeat(parms, results, states, buffer_state)); - auto result = Q::merge(query_results, parms); - args->result_set.set_value(std::move(result)); + args->result_set.set_value(std::move(results)); ((DynamicExtension *) args->extension)->end_job(epoch); @@ -624,7 +639,7 @@ private: static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) { if constexpr (Q::SKIP_DELETE_FILTER) { - return records; + return std::move(records); } std::vector<Wrapped<R>> processed_records; @@ -685,7 +700,12 @@ private: return processed_records; } +#ifdef _GNU_SOURCE void SetThreadAffinity() { + if constexpr (std::same_as<SCHED, SerialScheduler>) { + return; + } + int core = m_next_core.fetch_add(1) % m_core_cnt; cpu_set_t mask; CPU_ZERO(&mask); @@ -707,6 +727,11 @@ private: CPU_SET(core, &mask); ::sched_setaffinity(0, sizeof(mask), &mask); } +#else + void SetThreadAffinity() { + + } +#endif void end_job(_Epoch *epoch) { diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h index 3d487f0..577d6cd 100644 --- a/include/framework/interface/Query.h +++ b/include/framework/interface/Query.h @@ -13,17 +13,19 @@ namespace de{ template <typename Q, typename R, typename S> -concept QueryInterface = requires(void *p, S *sh, std::vector<void*> &s, std::vector<std::vector<Wrapped<R>>> &rv, BufferView<R> *bv) { +concept QueryInterface = requires(void *p, S *sh, std::vector<void*> &s, std::vector<std::vector<Wrapped<R>>> &rv, BufferView<R> *bv, std::vector<R> &resv) { {Q::get_query_state(sh, p)} -> std::convertible_to<void*>; {Q::get_buffer_query_state(bv, p)} -> std::convertible_to<void *>; {Q::process_query_states(p, s, p)}; {Q::query(sh, p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>; {Q::buffer_query(p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>; - {Q::merge(rv, p)} -> std::convertible_to<std::vector<R>>; + {Q::merge(rv, p, resv)}; {Q::delete_query_state(p)} -> std::same_as<void>; {Q::delete_buffer_query_state(p)} -> std::same_as<void>; + {Q::repeat(p, resv, s, p)} -> std::same_as<bool>; + {Q::EARLY_ABORT} -> std::convertible_to<bool>; {Q::SKIP_DELETE_FILTER} -> std::convertible_to<bool>; }; diff --git a/include/framework/interface/Record.h b/include/framework/interface/Record.h index 5b9f307..19ccadd 100644 --- a/include/framework/interface/Record.h +++ b/include/framework/interface/Record.h @@ -47,13 +47,18 @@ concept AlexInterface = KVPInterface<R> && requires(R r) { }; template<typename R> -concept WrappedInterface = RecordInterface<R> && requires(R r, R s, bool b) { +concept WrappedInterface = RecordInterface<R> && requires(R r, R s, bool b, int i) { {r.header} -> std::convertible_to<uint32_t>; r.rec; {r.set_delete()}; {r.is_deleted()} -> std::convertible_to<bool>; {r.set_tombstone(b)}; {r.is_tombstone()} -> std::convertible_to<bool>; + {r.set_timestamp(i)}; + {r.get_timestamp()} -> std::convertible_to<uint32_t>; + {r.clear_timestamp()}; + {r.is_visible()} -> std::convertible_to<bool>; + {r.set_visible()}; {r < s} -> std::convertible_to<bool>; {r == s} ->std::convertible_to<bool>; }; @@ -71,9 +76,29 @@ struct Wrapped { return header & 2; } + inline void set_visible() { + header |= 4; + } + + inline bool is_visible() const { + return header & 4; + } + + inline void set_timestamp(int ts) { + header |= (ts << 3); + } + + inline int get_timestamp() const { + return header >> 3; + } + + inline void clear_timestamp() { + header &= 7; + } + inline void set_tombstone(bool val=true) { if (val) { - header |= val; + header |= 1; } else { header &= 0; } @@ -97,9 +122,8 @@ template <typename K, typename V> struct Record { K key; V value; - uint32_t header = 0; - inline bool operator<(const Record& other) const { + inline bool operator<(const Record& other) const { return key < other.key || (key == other.key && value < other.value); } @@ -108,6 +132,23 @@ struct Record { } }; +template<typename V> +struct Record<const char*, V> { + const char* key; + V value; + size_t len; + + inline bool operator<(const Record& other) const { + size_t n = std::min(len, other.len) + 1; + return strncmp(key, other.key, n) < 0; + } + + inline bool operator==(const Record& other) const { + size_t n = std::min(len, other.len) + 1; + return strncmp(key, other.key, n) == 0; + } +}; + template <typename K, typename V, typename W> struct WeightedRecord { K key; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index d5d4266..bd53090 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -26,7 +26,7 @@ namespace de { template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L> struct ReconstructionArgs { Epoch<R, S, Q, L> *epoch; - std::vector<ReconstructionTask> merges; + ReconstructionVector merges; std::promise<bool> result; bool compaction; void *extension; diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 9e0872b..e95a799 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -20,7 +20,7 @@ namespace de { -typedef std::_Bind<void (*(void*, long unsigned int))(void*, long unsigned int)> ReleaseFunction; +typedef std::function<void(void)> ReleaseFunction; template <RecordInterface R> class BufferView { @@ -112,6 +112,10 @@ public: size_t get_record_count() { return m_tail - m_head; } + + size_t get_capacity() { + return m_cap; + } /* * NOTE: This function returns an upper bound on the number @@ -123,7 +127,7 @@ public: } Wrapped<R> *get(size_t i) { - assert(i < get_record_count()); + //assert(i < get_record_count()); return m_data + to_idx(i); } @@ -160,7 +164,7 @@ private: bool m_active; size_t to_idx(size_t i) { - size_t idx = (m_start + i >= m_cap) ? i = (m_cap - m_start) + size_t idx = (m_start + i >= m_cap) ? i - (m_cap - m_start) : m_start + i; assert(idx < m_cap); return idx; diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 4802bc1..b83674b 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -27,6 +27,16 @@ class ExtensionStructure { typedef S Shard; typedef BufferView<R> BuffView; + typedef struct { + size_t reccnt; + size_t reccap; + + size_t shardcnt; + size_t shardcap; + } level_state; + + typedef std::vector<level_state> state_vector; + public: ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) : m_scale_factor(scale_factor) @@ -59,6 +69,7 @@ public: } new_struct->m_refcnt = 0; + new_struct->m_current_state = m_current_state; return new_struct; } @@ -95,8 +106,13 @@ public: * takes a structure as input. */ inline bool flush_buffer(BuffView buffer) { - assert(can_reconstruct_with(0, buffer.get_record_count())); + state_vector tmp = m_current_state; + + if (tmp.size() == 0) { + grow(tmp); + } + assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); flush_buffer_into_l0(std::move(buffer)); return true; @@ -200,8 +216,16 @@ public: return m_levels; } - std::vector<ReconstructionTask> get_compaction_tasks() { - std::vector<ReconstructionTask> tasks; + /* + * NOTE: This cannot be simulated, because tombstone cancellation is not + * cheaply predictable. It is possible that the worst case number could + * be used instead, to allow for prediction, but compaction isn't a + * major concern outside of sampling; at least for now. So I'm not + * going to focus too much time on it at the moment. + */ + ReconstructionVector get_compaction_tasks() { + ReconstructionVector tasks; + state_vector scratch_state = m_current_state; /* if the tombstone/delete invariant is satisfied, no need for compactions */ if (validate_tombstone_proportion()) { @@ -219,14 +243,12 @@ public: assert(violation_idx != -1); - level_index base_level = find_reconstruction_target(violation_idx); + level_index base_level = find_reconstruction_target(violation_idx, scratch_state); if (base_level == -1) { - base_level = grow(); + base_level = grow(scratch_state); } for (level_index i=base_level; i>0; i--) { - ReconstructionTask task = {i-1, i}; - /* * The amount of storage required for the reconstruction accounts * for the cost of storing the new records, along with the @@ -239,13 +261,11 @@ public: */ size_t reccnt = m_levels[i - 1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt, scratch_state)) { reccnt += m_levels[i]->get_record_count(); } } - //task.m_size = 2* reccnt * sizeof(R); - - tasks.push_back(task); + tasks.add_reconstruction(i-i, i, reccnt); } return tasks; @@ -254,44 +274,53 @@ public: /* * */ - std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt) { - std::vector<ReconstructionTask> reconstructions; - - /* - * The buffer flush is not included so if that can be done without any - * other change, just return an empty list. + ReconstructionVector get_reconstruction_tasks(size_t buffer_reccnt, + state_vector scratch_state={}) { + /* + * If no scratch state vector is provided, use a copy of the + * current one. The only time an empty vector could be used as + * *real* input to this function is when the current state is also + * empty, so this should would even in that case. */ - if (can_reconstruct_with(0, buffer_reccnt)) { - return std::move(reconstructions); + if (scratch_state.size() == 0) { + scratch_state = m_current_state; } - level_index base_level = find_reconstruction_target(0); - if (base_level == -1) { - base_level = grow(); - } - - for (level_index i=base_level; i>0; i--) { - ReconstructionTask task = {i-1, i}; - + ReconstructionVector reconstructions; + size_t LOOKAHEAD = 1; + for (size_t i=0; i<LOOKAHEAD; i++) { /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. + * If L0 cannot support a direct buffer flush, figure out what + * work must be done to free up space first. Otherwise, the + * reconstruction vector will be initially empty. */ - size_t reccnt = m_levels[i-1]->get_record_count(); - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { - reccnt += m_levels[i]->get_record_count(); + if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) { + auto local_recon = get_reconstruction_tasks_from_level(0, scratch_state); + + /* + * for the first iteration, we need to do all of the + * reconstructions, so use these to initially the returned + * reconstruction list + */ + if (i == 0) { + reconstructions = local_recon; + /* + * Quick sanity test of idea: if the next reconstruction + * would be larger than this one, steal the largest + * task from it and run it now instead. + */ + } else if (local_recon.get_total_reccnt() > reconstructions.get_total_reccnt()) { + auto t = local_recon.remove_reconstruction(0); + reconstructions.add_reconstruction(t); } } - //task.m_size = 2* reccnt * sizeof(R); - reconstructions.push_back(task); + /* simulate the buffer flush in the scratch state */ + scratch_state[0].reccnt += buffer_reccnt; + if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) { + scratch_state[0].shardcnt += 1; + } + } return std::move(reconstructions); @@ -301,38 +330,109 @@ public: /* * */ - std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level) { - std::vector<ReconstructionTask> reconstructions; + ReconstructionVector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) { + ReconstructionVector reconstructions; - level_index base_level = find_reconstruction_target(source_level); + /* + * Find the first level capable of sustaining a reconstruction from + * the level above it. If no such level exists, add a new one at + * the bottom of the structure. + */ + level_index base_level = find_reconstruction_target(source_level, scratch_state); if (base_level == -1) { - base_level = grow(); + base_level = grow(scratch_state); + } + + if constexpr (L == LayoutPolicy::BSM) { + if (base_level == 0) { + return std::move(reconstructions); + } + + ReconstructionTask task; + task.target = base_level; + + size_t base_reccnt = 0; + for (level_index i=base_level; i>source_level; i--) { + auto recon_reccnt = scratch_state[i-1].reccnt; + base_reccnt += recon_reccnt; + scratch_state[i-1].reccnt = 0; + scratch_state[i-1].shardcnt = 0; + task.add_source(i-1, recon_reccnt); + } + + reconstructions.add_reconstruction(task); + scratch_state[base_level].reccnt = base_reccnt; + scratch_state[base_level].shardcnt = 1; + + return std::move(reconstructions); } + /* + * Determine the full set of reconstructions necessary to open up + * space in the source level. + */ for (level_index i=base_level; i>source_level; i--) { - ReconstructionTask task = {i - 1, i}; + size_t recon_reccnt = scratch_state[i-1].reccnt; + size_t base_reccnt = recon_reccnt; + /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. + * If using Leveling, the total reconstruction size will be the + * records in *both* base and target, because they will need to + * be merged (assuming that target isn't empty). */ - size_t reccnt = m_levels[i-1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { - reccnt += m_levels[i]->get_record_count(); + if (can_reconstruct_with(i, base_reccnt, scratch_state)) { + recon_reccnt += scratch_state[i].reccnt; } } -// task.m_size = 2* reccnt * sizeof(R); + reconstructions.add_reconstruction(i-1, i, recon_reccnt); + + /* + * The base level will be emptied and its records moved to + * the target. + */ + scratch_state[i-1].reccnt = 0; + scratch_state[i-1].shardcnt = 0; - reconstructions.push_back(task); + /* + * The target level will have the records from the base level + * added to it, and potentially gain a shard if the LayoutPolicy + * is tiering or the level currently lacks any shards at all. + */ + scratch_state[i].reccnt += base_reccnt; + if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) { + scratch_state[i].shardcnt += 1; + } } - return reconstructions; + return std::move(reconstructions); + } + + inline void reconstruction(ReconstructionTask task) { + static_assert(L == LayoutPolicy::BSM); + std::vector<InternalLevel<R, Shard, Q>*> levels(task.sources.size()); + for (size_t i=0; i<task.sources.size(); i++) { + levels[i] = m_levels[task.sources[i]].get(); + } + + auto new_level = InternalLevel<R, Shard, Q>::reconstruction(levels, task.target); + if (task.target >= m_levels.size()) { + m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target), + 1, 1}); + m_levels.emplace_back(new_level); + } else { + m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target), + 1, 1}; + m_levels[task.target] = new_level; + } + + /* remove all of the levels that have been flattened */ + for (size_t i=0; i<task.sources.size(); i++) { + m_levels[task.sources[i]] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(task.sources[i], 1)); + m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1}; + } + + return; } /* @@ -342,6 +442,14 @@ public: * tombstone ordering invariant may be violated. */ inline void reconstruction(level_index base_level, level_index incoming_level) { + size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + if (base_level >= m_levels.size()) { + m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(base_level, shard_capacity))); + m_current_state.push_back({0, calc_level_record_capacity(base_level), + 0, shard_capacity}); + } + if constexpr (L == LayoutPolicy::LEVELING) { /* if the base level has a shard, merge the base and incoming together to make a new one */ if (m_levels[base_level]->get_shard_count() > 0) { @@ -350,6 +458,7 @@ public: } else { m_levels[base_level] = m_levels[incoming_level]; } + } else { m_levels[base_level]->append_level(m_levels[incoming_level].get()); m_levels[base_level]->finalize(); @@ -357,6 +466,14 @@ public: /* place a new, empty level where the incoming level used to be */ m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + + /* + * Update the state vector to match the *real* state following + * the reconstruction + */ + m_current_state[base_level] = {m_levels[base_level]->get_record_count(), + calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity}; + m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; } bool take_reference() { @@ -393,14 +510,27 @@ private: std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels; + /* + * A pair of <record_count, shard_count> for each level in the + * structure. Record counts may be slightly inaccurate due to + * deletes. + */ + state_vector m_current_state; + /* - * Add a new level to the structure and return its index. + * Add a new level to the scratch state and return its index. + * + * IMPORTANT: This does _not_ add a level to the extension structure + * anymore. This is handled by the appropriate reconstruction and flush + * methods as needed. This function is for use in "simulated" + * reconstructions. */ - inline level_index grow() { + inline level_index grow(state_vector &scratch_state) { level_index new_idx = m_levels.size(); - size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt))); + scratch_state.push_back({0, calc_level_record_capacity(new_idx), + 0, new_shard_cap}); return new_idx; } @@ -411,24 +541,36 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first buffer flush. */ - inline level_index find_reconstruction_target(level_index idx) { + inline level_index find_reconstruction_target(level_index idx, state_vector &state) { - if (idx == 0 && m_levels.size() == 0) return -1; + /* + * this handles the very first buffer flush, when the state vector + * is empty. + */ + if (idx == 0 && state.size() == 0) return -1; - size_t incoming_rec_cnt = get_level_record_count(idx); - for (level_index i=idx+1; i<m_levels.size(); i++) { - if (can_reconstruct_with(i, incoming_rec_cnt)) { + size_t incoming_rec_cnt = state[idx].reccnt; + for (level_index i=idx+1; i<state.size(); i++) { + if (can_reconstruct_with(i, incoming_rec_cnt, state)) { return i; } - incoming_rec_cnt = get_level_record_count(i); + incoming_rec_cnt = state[idx].reccnt; } return -1; } inline void flush_buffer_into_l0(BuffView buffer) { - assert(m_levels[0]); + size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + if (m_levels.size() == 0) { + m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(0, shard_capacity))); + + m_current_state.push_back({0, calc_level_record_capacity(0), + 0, shard_capacity}); + } + if constexpr (L == LayoutPolicy::LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0].get(); @@ -444,6 +586,10 @@ private: } else { m_levels[0]->append_buffer(std::move(buffer)); } + + /* update the state vector */ + m_current_state[0].reccnt = m_levels[0]->get_record_count(); + m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); } /* @@ -475,15 +621,17 @@ private: * Determines if a level can sustain a reconstruction with incoming_rec_cnt * additional records without exceeding its capacity. */ - inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) { - if (idx >= m_levels.size() || !m_levels[idx]) { + inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) { + if (idx >= state.size()) { return false; } - if (L == LayoutPolicy::LEVELING) { - return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); - } else { - return m_levels[idx]->get_shard_count() < m_scale_factor; + if constexpr (L == LayoutPolicy::LEVELING) { + return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; + } else if constexpr (L == LayoutPolicy::BSM) { + return state[idx].reccnt == 0; + } else { + return state[idx].shardcnt < state[idx].shardcap; } /* unreachable */ diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index db38946..b962dcc 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -64,6 +64,21 @@ public: return std::shared_ptr<InternalLevel>(res); } + static std::shared_ptr<InternalLevel> reconstruction(std::vector<InternalLevel*> levels, size_t level_idx) { + std::vector<Shard *> shards; + for (auto level : levels) { + for (auto shard : level->m_shards) { + if (shard) shards.emplace_back(shard.get()); + } + } + + auto res = new InternalLevel(level_idx, 1); + res->m_shard_cnt = 1; + res->m_shards[0] = std::make_shared<S>(shards); + + return std::shared_ptr<InternalLevel>(res); + } + /* * Create a new shard combining the records from all of * the shards in level, and append this new shard into @@ -185,6 +200,10 @@ public: } Shard* get_shard(size_t idx) { + if (idx >= m_shard_cnt) { + return nullptr; + } + return m_shards[idx].get(); } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 415c95a..7db3980 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -50,18 +50,19 @@ public: , m_tail(0) , m_head({0, 0}) , m_old_head({high_watermark, 0}) - , m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>))) + //, m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>))) + , m_data(new Wrapped<R>[m_cap]()) , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS)) , m_tscnt(0) , m_old_tscnt(0) , m_active_head_advance(false) { assert(m_cap > m_hwm); - assert(m_hwm > m_lwm); + assert(m_hwm >= m_lwm); } ~MutableBuffer() { - free(m_data); + delete[] m_data; delete m_tombstone_filter; } @@ -76,16 +77,20 @@ public: wrec.header = 0; if (tombstone) wrec.set_tombstone(); + // FIXME: because of the mod, it isn't correct to use `pos` + // as the ordering timestamp in the header anymore. size_t pos = tail % m_cap; m_data[pos] = wrec; - m_data[pos].header |= (pos << 2); + m_data[pos].set_timestamp(pos); if (tombstone) { m_tscnt.fetch_add(1); if (m_tombstone_filter) m_tombstone_filter->insert(rec); } + m_data[pos].set_visible(); + return 1; } diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 65ca181..4a4524a 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -35,7 +35,8 @@ static thread_local size_t sampling_bailouts = 0; enum class LayoutPolicy { LEVELING, - TEIRING + TEIRING, + BSM }; enum class DeletePolicy { @@ -43,7 +44,4 @@ enum class DeletePolicy { TAGGING }; -typedef ssize_t level_index; -typedef std::pair<level_index, level_index> ReconstructionTask; - } diff --git a/include/query/irs.h b/include/query/irs.h index e2d9325..879d070 100644 --- a/include/query/irs.h +++ b/include/query/irs.h @@ -40,7 +40,12 @@ struct BufferState { size_t sample_size; BufferView<R> *buffer; + psudb::Alias *alias; + BufferState(BufferView<R> *buffer) : buffer(buffer) {} + ~BufferState() { + delete alias; + } }; template <RecordInterface R, ShardInterface<R> S, bool Rejection=true> @@ -72,6 +77,7 @@ public: res->cutoff = res->buffer->get_record_count(); res->sample_size = 0; + res->alias = nullptr; if constexpr (Rejection) { return res; @@ -96,39 +102,51 @@ public: std::vector<size_t> shard_sample_sizes(shard_states.size()+1, 0); size_t buffer_sz = 0; - std::vector<size_t> weights; - if constexpr (Rejection) { - weights.push_back((bs) ? bs->cutoff : 0); - } else { - weights.push_back((bs) ? bs->records.size() : 0); + /* for simplicity of static structure testing */ + if (!bs) { + assert(shard_states.size() == 1); + auto state = (State<R> *) shard_states[0]; + state->sample_size = p->sample_size; + return; } - size_t total_weight = 0; - for (auto &s : shard_states) { - auto state = (State<R> *) s; - total_weight += state->total_weight; - weights.push_back(state->total_weight); - } + /* we only need to build the shard alias on the first call */ + if (bs->alias == nullptr) { + std::vector<size_t> weights; + if constexpr (Rejection) { + weights.push_back((bs) ? bs->cutoff : 0); + } else { + weights.push_back((bs) ? bs->records.size() : 0); + } - // if no valid records fall within the query range, just - // set all of the sample sizes to 0 and bail out. - if (total_weight == 0) { - for (size_t i=0; i<shard_states.size(); i++) { - auto state = (State<R> *) shard_states[i]; - state->sample_size = 0; + size_t total_weight = weights[0]; + for (auto &s : shard_states) { + auto state = (State<R> *) s; + total_weight += state->total_weight; + weights.push_back(state->total_weight); } - return; - } + // if no valid records fall within the query range, just + // set all of the sample sizes to 0 and bail out. + if (total_weight == 0) { + for (size_t i=0; i<shard_states.size(); i++) { + auto state = (State<R> *) shard_states[i]; + state->sample_size = 0; + } - std::vector<double> normalized_weights; - for (auto w : weights) { - normalized_weights.push_back((double) w / (double) total_weight); + return; + } + + std::vector<double> normalized_weights; + for (auto w : weights) { + normalized_weights.push_back((double) w / (double) total_weight); + } + + bs->alias = new psudb::Alias(normalized_weights); } - auto shard_alias = psudb::Alias(normalized_weights); for (size_t i=0; i<p->sample_size; i++) { - auto idx = shard_alias.get(p->rng); + auto idx = bs->alias->get(p->rng); if (idx == 0) { buffer_sz++; } else { @@ -198,16 +216,12 @@ public: return result; } - static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) { - std::vector<R> output; - + static void merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) { for (size_t i=0; i<results.size(); i++) { for (size_t j=0; j<results[i].size(); j++) { output.emplace_back(results[i][j].rec); } } - - return output; } static void delete_query_state(void *state) { @@ -219,5 +233,18 @@ public: auto s = (BufferState<R> *) state; delete s; } + + static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) { + auto p = (Parms<R> *) parms; + + if (results.size() < p->sample_size) { + auto q = *p; + q.sample_size -= results.size(); + process_query_states(&q, states, buffer_state); + return true; + } + + return false; + } }; }} diff --git a/include/query/knn.h b/include/query/knn.h index 19dcf5c..a227293 100644 --- a/include/query/knn.h +++ b/include/query/knn.h @@ -111,10 +111,10 @@ public: pq.pop(); } - return results; + return std::move(results); } - static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) { + static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) { Parms<R> *p = (Parms<R> *) parms; R rec = p->point; size_t k = p->k; @@ -136,13 +136,12 @@ public: } } - std::vector<R> output; while (pq.size() > 0) { output.emplace_back(*pq.peek().data); pq.pop(); } - return output; + return std::move(output); } static void delete_query_state(void *state) { @@ -154,6 +153,10 @@ public: auto s = (BufferState<R> *) state; delete s; } + + static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) { + return false; + } }; }} diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h new file mode 100644 index 0000000..94c2bce --- /dev/null +++ b/include/query/pointlookup.h @@ -0,0 +1,123 @@ +/* + * include/query/pointlookup.h + * + * Copyright (C) 2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * A query class for point lookup operations. + * + * TODO: Currently, this only supports point lookups for unique keys (which + * is the case for the trie that we're building this to use). It would be + * pretty straightforward to extend it to return *all* records that match + * the search_key (including tombstone cancellation--it's invertible) to + * support non-unique indexes, or at least those implementing + * lower_bound(). + */ +#pragma once + +#include "framework/QueryRequirements.h" + +namespace de { namespace pl { + +template <RecordInterface R> +struct Parms { + decltype(R::key) search_key; +}; + +template <RecordInterface R> +struct State { +}; + +template <RecordInterface R> +struct BufferState { + BufferView<R> *buffer; + + BufferState(BufferView<R> *buffer) + : buffer(buffer) {} +}; + +template <KVPInterface R, ShardInterface<R> S> +class Query { +public: + constexpr static bool EARLY_ABORT=true; + constexpr static bool SKIP_DELETE_FILTER=true; + + static void *get_query_state(S *shard, void *parms) { + return nullptr; + } + + static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) { + auto res = new BufferState<R>(buffer); + + return res; + } + + static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void* buffer_state) { + return; + } + + static std::vector<Wrapped<R>> query(S *shard, void *q_state, void *parms) { + auto p = (Parms<R> *) parms; + auto s = (State<R> *) q_state; + + std::vector<Wrapped<R>> result; + + auto r = shard->point_lookup({p->search_key, 0}); + + if (r) { + result.push_back(*r); + } + + return result; + } + + static std::vector<Wrapped<R>> buffer_query(void *state, void *parms) { + auto p = (Parms<R> *) parms; + auto s = (BufferState<R> *) state; + + std::vector<Wrapped<R>> records; + for (size_t i=0; i<s->buffer->get_record_count(); i++) { + auto rec = s->buffer->get(i); + + if (rec->rec.key == p->search_key) { + records.push_back(*rec); + return records; + } + } + + return records; + } + + static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) { + for (auto r : results) { + if (r.size() > 0) { + if (r[0].is_deleted() || r[0].is_tombstone()) { + return output; + } + + output.push_back(r[0].rec); + return output; + } + } + + return output; + } + + static void delete_query_state(void *state) { + auto s = (State<R> *) state; + delete s; + } + + static void delete_buffer_query_state(void *state) { + auto s = (BufferState<R> *) state; + delete s; + } + + + static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) { + return false; + } +}; + +}} diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 6c57809..5b95cdd 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -35,20 +35,14 @@ struct BufferState { : buffer(buffer) {} }; -template <KVPInterface R, ShardInterface<R> S> +template <KVPInterface R, ShardInterface<R> S, bool FORCE_SCAN=false> class Query { public: constexpr static bool EARLY_ABORT=false; constexpr static bool SKIP_DELETE_FILTER=true; static void *get_query_state(S *shard, void *parms) { - auto res = new State<R>(); - auto p = (Parms<R> *) parms; - - res->start_idx = shard->get_lower_bound(p->lower_bound); - res->stop_idx = shard->get_record_count(); - - return res; + return nullptr; } static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) { @@ -74,37 +68,43 @@ public: res.rec.value = 0; // tombstones records.emplace_back(res); + + auto start_idx = shard->get_lower_bound(p->lower_bound); + auto stop_idx = shard->get_lower_bound(p->upper_bound); + /* * if the returned index is one past the end of the * records for the PGM, then there are not records * in the index falling into the specified range. */ - if (s->start_idx == shard->get_record_count()) { + if (start_idx == shard->get_record_count()) { return records; } - auto ptr = shard->get_record_at(s->start_idx); /* * roll the pointer forward to the first record that is * greater than or equal to the lower bound. */ - while(ptr < shard->get_data() + s->stop_idx && ptr->rec.key < p->lower_bound) { - ptr++; + auto recs = shard->get_data(); + while(start_idx < stop_idx && recs[start_idx].rec.key < p->lower_bound) { + start_idx++; } - while (ptr < shard->get_data() + s->stop_idx && ptr->rec.key <= p->upper_bound) { - if (!ptr->is_deleted()) { - if (ptr->is_tombstone()) { - records[0].rec.value++; - } else { - records[0].rec.key++; - } - } + while (stop_idx < shard->get_record_count() && recs[stop_idx].rec.key <= p->upper_bound) { + stop_idx++; + } + size_t idx = start_idx; + size_t ts_cnt = 0; - ptr++; + while (idx < stop_idx) { + ts_cnt += recs[idx].is_tombstone() * 2 + recs[idx].is_deleted(); + idx++; } + records[0].rec.key = idx - start_idx; + records[0].rec.value = ts_cnt; + return records; } @@ -119,8 +119,16 @@ public: res.rec.value = 0; // tombstones records.emplace_back(res); + size_t stop_idx; + if constexpr (FORCE_SCAN) { + stop_idx = s->buffer->get_capacity() / 2; + } else { + stop_idx = s->buffer->get_record_count(); + } + for (size_t i=0; i<s->buffer->get_record_count(); i++) { auto rec = s->buffer->get(i); + if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound && !rec->is_deleted()) { if (rec->is_tombstone()) { @@ -134,12 +142,10 @@ public: return records; } - static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) { - + static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) { R res; res.key = 0; res.value = 0; - std::vector<R> output; output.emplace_back(res); for (size_t i=0; i<results.size(); i++) { @@ -152,14 +158,16 @@ public: } static void delete_query_state(void *state) { - auto s = (State<R> *) state; - delete s; } static void delete_buffer_query_state(void *state) { auto s = (BufferState<R> *) state; delete s; } + + static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) { + return false; + } }; }} diff --git a/include/query/rangequery.h b/include/query/rangequery.h index 24b38ec..e0690e6 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -109,7 +109,7 @@ public: return records; } - static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) { + static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) { std::vector<Cursor<Wrapped<R>>> cursors; cursors.reserve(results.size()); @@ -121,7 +121,7 @@ public: for (size_t i = 0; i < tmp_n; ++i) if (results[i].size() > 0){ auto base = results[i].data(); - cursors.emplace_back(Cursor{base, base + results[i].size(), 0, results[i].size()}); + cursors.emplace_back(Cursor<Wrapped<R>>{base, base + results[i].size(), 0, results[i].size()}); assert(i == cursors.size() - 1); total += results[i].size(); pq.push(cursors[i].ptr, tmp_n - i - 1); @@ -133,7 +133,6 @@ public: return std::vector<R>(); } - std::vector<R> output; output.reserve(total); while (pq.size()) { @@ -169,6 +168,10 @@ public: auto s = (BufferState<R> *) state; delete s; } + + static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) { + return false; + } }; }} diff --git a/include/query/wirs.h b/include/query/wirs.h index ae82194..62b43f6 100644 --- a/include/query/wirs.h +++ b/include/query/wirs.h @@ -219,9 +219,7 @@ public: return result; } - static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) { - std::vector<R> output; - + static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) { for (size_t i=0; i<results.size(); i++) { for (size_t j=0; j<results[i].size(); j++) { output.emplace_back(results[i][j].rec); @@ -240,5 +238,14 @@ public: auto s = (BufferState<R> *) state; delete s; } + + static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) { + auto p = (Parms<R> *) parms; + + if (results.size() < p->sample_size) { + return true; + } + return false; + } }; }} diff --git a/include/query/wss.h b/include/query/wss.h index 8797035..fb0b414 100644 --- a/include/query/wss.h +++ b/include/query/wss.h @@ -183,9 +183,7 @@ public: return result; } - static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) { - std::vector<R> output; - + static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) { for (size_t i=0; i<results.size(); i++) { for (size_t j=0; j<results[i].size(); j++) { output.emplace_back(results[i][j].rec); @@ -204,6 +202,15 @@ public: auto s = (BufferState<R> *) state; delete s; } + + static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) { + auto p = (Parms<R> *) parms; + + if (results.size() < p->sample_size) { + return true; + } + return false; + } }; }} diff --git a/include/shard/Alias.h b/include/shard/Alias.h index 9275952..72147d7 100644 --- a/include/shard/Alias.h +++ b/include/shard/Alias.h @@ -148,7 +148,7 @@ public: size_t get_memory_usage() { - return m_alloc_size; + return 0; } size_t get_aux_memory_usage() { diff --git a/include/shard/AugBTree.h b/include/shard/AugBTree.h index 54931bd..c60cbcd 100644 --- a/include/shard/AugBTree.h +++ b/include/shard/AugBTree.h @@ -148,7 +148,7 @@ public: } size_t get_memory_usage() { - return m_alloc_size + m_node_cnt * sizeof(AugBTreeNode<Wrapped<R>>); + return m_node_cnt * sizeof(AugBTreeNode<Wrapped<R>>); } size_t get_aux_memory_usage() { diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h new file mode 100644 index 0000000..3783b38 --- /dev/null +++ b/include/shard/FSTrie.h @@ -0,0 +1,200 @@ +/* + * include/shard/FSTrie.h + * + * Copyright (C) 2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * A shard shim around the FSTrie learned index. + */ +#pragma once + + +#include <vector> + +#include "framework/ShardRequirements.h" +#include "fst.hpp" +#include "util/SortedMerge.h" + +using psudb::CACHELINE_SIZE; +using psudb::BloomFilter; +using psudb::PriorityQueue; +using psudb::queue_record; +using psudb::byte; + +namespace de { + +template <KVPInterface R> +class FSTrie { +private: + + typedef decltype(R::key) K; + typedef decltype(R::value) V; + static_assert(std::is_same_v<K, const char*>, "FST requires const char* keys."); + +public: + FSTrie(BufferView<R> buffer) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + m_data = new Wrapped<R>[buffer.get_record_count()](); + m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count(); + + size_t cnt = 0; + std::vector<std::string> keys; + keys.reserve(buffer.get_record_count()); + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = new Wrapped<R>[buffer.get_record_count()](); + for (size_t i=0; i<buffer.get_record_count(); i++) { + temp_buffer[i] = *(buffer.get(i)); + } + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + for (size_t i=0; i<buffer.get_record_count(); i++) { + if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || temp_buffer[i].rec.key == "") { + continue; + } + + m_data[cnt] = temp_buffer[i]; + m_data[cnt].clear_timestamp(); + + keys.push_back(std::string(m_data[cnt].rec.key)); + cnt++; + } + + m_reccnt = cnt; + if (m_reccnt > 0) { + m_fst = new fst::Trie(keys, true, 1); + } + + delete[] temp_buffer; + } + + FSTrie(std::vector<FSTrie*> &shards) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec<R, FSTrie>(shards, &attemp_reccnt, &tombstone_count); + + m_data = new Wrapped<R>[attemp_reccnt](); + m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>); + + std::vector<std::string> keys; + keys.reserve(attemp_reccnt); + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i=0; i<cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } + + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { + m_data[m_reccnt] = *cursor.ptr; + keys.push_back(std::string(m_data[m_reccnt].rec.key)); + + m_reccnt++; + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + if (m_reccnt > 0) { + m_fst = new fst::Trie(keys, true, 1); + } + } + + ~FSTrie() { + delete[] m_data; + delete m_fst; + } + + Wrapped<R> *point_lookup(const R &rec, bool filter=false) { + + auto idx = m_fst->exactSearch(rec.key); + + if (idx == fst::kNotFound) { + return nullptr; + } + + // FIXME: for convenience, I'm treating this Trie as a unique index + // for now, so no need to scan forward and/or check values. This + // also makes the point lookup query class a lot easier to make. + // Ultimately, though, we can support non-unique indexes with some + // extra work. + + return m_data + idx; + } + + Wrapped<R>* get_data() const { + return m_data; + } + + size_t get_record_count() const { + return m_reccnt; + } + + size_t get_tombstone_count() const { + return 0; + } + + const Wrapped<R>* get_record_at(size_t idx) const { + if (idx >= m_reccnt) return nullptr; + return m_data + idx; + } + + + size_t get_memory_usage() { + return m_fst->getMemoryUsage(); + } + + size_t get_aux_memory_usage() { + return m_alloc_size; + } + + size_t get_lower_bound(R &rec) {return 0;} + size_t get_upper_bound(R &rec) {return 0;} + +private: + + Wrapped<R>* m_data; + size_t m_reccnt; + size_t m_alloc_size; + fst::Trie *m_fst; +}; +} diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index 3763271..1cca506 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -51,7 +51,7 @@ constexpr static size_t LEAF_FANOUT = NODE_SZ / sizeof(R); public: ISAMTree(BufferView<R> buffer) - : m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + : m_bf(nullptr) , m_isam_nodes(nullptr) , m_root(nullptr) , m_reccnt(0) @@ -59,19 +59,12 @@ public: , m_internal_node_cnt(0) , m_deleted_cnt(0) , m_alloc_size(0) - , m_data(nullptr) { m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), (byte**) &m_data); - /* - * without this, gcc seems to hoist the building of the array - * _above_ its allocation under -O3, resulting in memfaults. - */ - asm volatile ("" ::: "memory"); - auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); m_reccnt = res.record_count; m_tombstone_cnt = res.tombstone_count; @@ -90,13 +83,12 @@ public: , m_internal_node_cnt(0) , m_deleted_cnt(0) , m_alloc_size(0) - , m_data(nullptr) { size_t attemp_reccnt = 0; size_t tombstone_count = 0; auto cursors = build_cursor_vec<R, ISAMTree>(shards, &attemp_reccnt, &tombstone_count); - m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); + m_bf = nullptr; m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **) &m_data); @@ -149,7 +141,7 @@ public: size_t get_memory_usage() { - return m_alloc_size + m_internal_node_cnt * NODE_SZ; + return m_internal_node_cnt * NODE_SZ; } size_t get_aux_memory_usage() { diff --git a/include/shard/LoudsPatricia.h b/include/shard/LoudsPatricia.h new file mode 100644 index 0000000..3452839 --- /dev/null +++ b/include/shard/LoudsPatricia.h @@ -0,0 +1,199 @@ +/* + * include/shard/LoudsPatricia.h + * + * Copyright (C) 2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * A shard shim around the LoudsPatricia learned index. + */ +#pragma once + + +#include <vector> + +#include "framework/ShardRequirements.h" +#include "louds-patricia.hpp" +#include "util/SortedMerge.h" + +using psudb::CACHELINE_SIZE; +using psudb::BloomFilter; +using psudb::PriorityQueue; +using psudb::queue_record; +using psudb::byte; + +namespace de { + +template <KVPInterface R> +class LoudsPatricia { +private: + + typedef decltype(R::key) K; + typedef decltype(R::value) V; + static_assert(std::is_same_v<K, const char*>, "FST requires const char* keys."); + +public: + LoudsPatricia(BufferView<R> buffer) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + m_data = new Wrapped<R>[buffer.get_record_count()](); + m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count(); + + m_louds = new louds::Patricia(); + + size_t cnt = 0; + std::vector<std::string> keys; + keys.reserve(buffer.get_record_count()); + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = new Wrapped<R>[buffer.get_record_count()](); + for (size_t i=0; i<buffer.get_record_count(); i++) { + temp_buffer[i] = *(buffer.get(i)); + } + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + for (size_t i=0; i<buffer.get_record_count(); i++) { + if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || temp_buffer[i].rec.key == "") { + continue; + } + + m_data[cnt] = temp_buffer[i]; + m_data[cnt].clear_timestamp(); + + m_louds->add(std::string(m_data[cnt].rec.key)); + cnt++; + } + + m_reccnt = cnt; + if (m_reccnt > 0) { + m_louds->build(); + } + + delete[] temp_buffer; + } + + LoudsPatricia(std::vector<LoudsPatricia*> &shards) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec<R, LoudsPatricia>(shards, &attemp_reccnt, &tombstone_count); + + m_data = new Wrapped<R>[attemp_reccnt](); + m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>); + + m_louds = new louds::Patricia(); + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i=0; i<cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } + + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { + m_data[m_reccnt] = *cursor.ptr; + m_louds->add(std::string(m_data[m_reccnt].rec.key)); + m_reccnt++; + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + if (m_reccnt > 0) { + m_louds->build(); + } + } + + ~LoudsPatricia() { + delete[] m_data; + delete m_louds; + } + + Wrapped<R> *point_lookup(const R &rec, bool filter=false) { + + auto idx = m_louds->lookup(std::string(rec.key)); + + if (idx == -1) { + return nullptr; + } + + // FIXME: for convenience, I'm treating this Trie as a unique index + // for now, so no need to scan forward and/or check values. This + // also makes the point lookup query class a lot easier to make. + // Ultimately, though, we can support non-unique indexes with some + // extra work. + return m_data + idx; + } + + Wrapped<R>* get_data() const { + return m_data; + } + + size_t get_record_count() const { + return m_reccnt; + } + + size_t get_tombstone_count() const { + return 0; + } + + const Wrapped<R>* get_record_at(size_t idx) const { + if (idx >= m_reccnt) return nullptr; + return m_data + idx; + } + + + size_t get_memory_usage() { + return m_louds->size(); + } + + size_t get_aux_memory_usage() { + return m_alloc_size; + } + + size_t get_lower_bound(R &rec) {return 0;} + size_t get_upper_bound(R &rec) {return 0;} + +private: + + Wrapped<R>* m_data; + size_t m_reccnt; + size_t m_alloc_size; + louds::Patricia *m_louds; +}; +} diff --git a/include/shard/PGM.h b/include/shard/PGM.h index e2752ef..509796b 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -39,8 +39,7 @@ private: public: PGM(BufferView<R> buffer) - : m_data(nullptr) - , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + : m_bf(nullptr) , m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) { @@ -49,16 +48,63 @@ public: buffer.get_record_count() * sizeof(Wrapped<R>), (byte**) &m_data); - auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; - if (m_reccnt > 0) { - std::vector<K> keys; - for (size_t i=0; i<m_reccnt; i++) { - keys.emplace_back(m_data[i].rec.key); + std::vector<K> keys; + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + buffer.get_record_count(), + sizeof(Wrapped<R>)); + buffer.copy_to_buffer((byte *) temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + merge_info info = {0, 0}; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) + && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; + } + + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + keys.emplace_back(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf){ + m_bf->insert(base->rec); + } } + base++; + } + + free(temp_buffer); + + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_pgm = pgm::PGMIndex<K, epsilon>(keys); } } @@ -74,21 +120,65 @@ public: size_t tombstone_count = 0; auto cursors = build_cursor_vec<R, PGM>(shards, &attemp_reccnt, &tombstone_count); - m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **) &m_data); + std::vector<K> keys; - auto res = sorted_array_merge<R>(cursors, m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i=0; i<cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } - if (m_reccnt > 0) { - std::vector<K> keys; - for (size_t i=0; i<m_reccnt; i++) { - keys.emplace_back(m_data[i].rec.key); + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + keys.emplace_back(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); + } + } + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); } + } + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_pgm = pgm::PGMIndex<K, epsilon>(keys); } } @@ -132,7 +222,7 @@ public: size_t get_memory_usage() { - return m_pgm.size_in_bytes() + m_alloc_size; + return m_pgm.size_in_bytes(); } size_t get_aux_memory_usage() { @@ -147,14 +237,16 @@ public: return m_reccnt; } - // If the region to search is less than some pre-specified - // amount, perform a linear scan to locate the record. + /* + * If the region to search is less than some pre-specified + * amount, perform a linear scan to locate the record. + */ if (bound.hi - bound.lo < 256) { while (idx < bound.hi && m_data[idx].rec.key < key) { idx++; } } else { - // Otherwise, perform a binary search + /* Otherwise, perform a binary search */ idx = bound.lo; size_t max = bound.hi; @@ -169,10 +261,26 @@ public: } + /* + * the upper bound returned by PGM is one passed the end of the + * array. If we are at that point, we should just return "not found" + */ + if (idx == m_reccnt) { + return idx; + } + + /* + * We may have walked one passed the actual lower bound, so check + * the index before the current one to see if it is the actual bound + */ if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) { return idx-1; } + /* + * Otherwise, check idx. If it is a valid bound, then return it, + * otherwise return "not found". + */ return (m_data[idx].rec.key >= key) ? idx : m_reccnt; } diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 2a432e8..581277e 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -36,39 +36,94 @@ private: public: TrieSpline(BufferView<R> buffer) - : m_data(nullptr) - , m_reccnt(0) + : m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) , m_max_key(0) , m_min_key(0) - , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + , m_bf(nullptr) { m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), (byte**) &m_data); - auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + buffer.get_record_count(), + sizeof(Wrapped<R>)); + buffer.copy_to_buffer((byte *) temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + auto tmp_min_key = temp_buffer[0].rec.key; + auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key; + auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E); + + merge_info info = {0, 0}; + + m_min_key = tmp_max_key; + m_max_key = tmp_min_key; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) + && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; + } - if (m_reccnt > 0) { - m_min_key = m_data[0].rec.key; - m_max_key = m_data[m_reccnt-1].rec.key; + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + bldr.AddKey(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf){ + m_bf->insert(base->rec); + } + } - auto bldr = ts::Builder<K>(m_min_key, m_max_key, E); - for (size_t i=0; i<m_reccnt; i++) { - bldr.AddKey(m_data[i].rec.key); + if (base->rec.key < m_min_key) { + m_min_key = base->rec.key; + } + + if (base->rec.key > m_max_key) { + m_max_key = base->rec.key; } + base++; + } + + free(temp_buffer); + + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 50) { m_ts = bldr.Finalize(); } } TrieSpline(std::vector<TrieSpline*> &shards) - : m_data(nullptr) - , m_reccnt(0) + : m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) , m_max_key(0) @@ -79,24 +134,90 @@ public: size_t tombstone_count = 0; auto cursors = build_cursor_vec<R, TrieSpline>(shards, &attemp_reccnt, &tombstone_count); - m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **) &m_data); - auto res = sorted_array_merge<R>(cursors, m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i=0; i<cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } - if (m_reccnt > 0) { - m_min_key = m_data[0].rec.key; - m_max_key = m_data[m_reccnt-1].rec.key; + auto tmp_max_key = shards[0]->m_max_key; + auto tmp_min_key = shards[0]->m_min_key; - auto bldr = ts::Builder<K>(m_min_key, m_max_key, E); - for (size_t i=0; i<m_reccnt; i++) { - bldr.AddKey(m_data[i].rec.key); + for (size_t i=0; i<shards.size(); i++) { + if (shards[i]->m_max_key > tmp_max_key) { + tmp_max_key = shards[i]->m_max_key; + } + + if (shards[i]->m_min_key < tmp_min_key) { + tmp_min_key = shards[i]->m_min_key; } + } + + auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E); + + m_max_key = tmp_min_key; + m_min_key = tmp_max_key; + + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + bldr.AddKey(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); + } + } + + if (cursor.ptr->rec.key < m_min_key) { + m_min_key = cursor.ptr->rec.key; + } + + if (cursor.ptr->rec.key > m_max_key) { + m_max_key = cursor.ptr->rec.key; + } + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + if (m_reccnt > 50) { m_ts = bldr.Finalize(); } } @@ -107,7 +228,7 @@ public: } Wrapped<R> *point_lookup(const R &rec, bool filter=false) { - if (filter && !m_bf->lookup(rec)) { + if (filter && m_bf && !m_bf->lookup(rec)) { return nullptr; } @@ -144,7 +265,7 @@ public: size_t get_memory_usage() { - return m_ts.GetSize() + m_alloc_size; + return m_ts.GetSize(); } size_t get_aux_memory_usage() { @@ -152,6 +273,18 @@ public: } size_t get_lower_bound(const K& key) const { + if (m_reccnt < 50) { + size_t bd = m_reccnt; + for (size_t i=0; i<m_reccnt; i++) { + if (m_data[i].rec.key >= key) { + bd = i; + break; + } + } + + return bd; + } + auto bound = m_ts.GetSearchBound(key); size_t idx = bound.begin; diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h index b342fe6..d5a2393 100644 --- a/include/shard/VPTree.h +++ b/include/shard/VPTree.h @@ -58,7 +58,7 @@ public: sizeof(Wrapped<R>), (byte**) &m_data); - m_ptrs = new Wrapped<R>*[buffer.get_record_count()]; + m_ptrs = new vp_ptr[buffer.get_record_count()]; size_t offset = 0; m_reccnt = 0; @@ -76,7 +76,7 @@ public: rec->header &= 3; m_data[m_reccnt] = *rec; - m_ptrs[m_reccnt] = &m_data[m_reccnt]; + m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; m_reccnt++; } @@ -97,7 +97,7 @@ public: m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **) &m_data); - m_ptrs = new Wrapped<R>*[attemp_reccnt]; + m_ptrs = new vp_ptr[attemp_reccnt]; // FIXME: will eventually need to figure out tombstones // this one will likely require the multi-pass @@ -110,7 +110,7 @@ public: } m_data[m_reccnt] = *shards[i]->get_record_at(j); - m_ptrs[m_reccnt] = &m_data[m_reccnt]; + m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; m_reccnt++; } } @@ -139,8 +139,8 @@ public: } else { vpnode *node = m_root; - while (!node->leaf && m_ptrs[node->start]->rec != rec) { - if (rec.calc_distance((m_ptrs[node->start]->rec)) >= node->radius) { + while (!node->leaf && m_ptrs[node->start].ptr->rec != rec) { + if (rec.calc_distance((m_ptrs[node->start].ptr->rec)) >= node->radius) { node = node->outside; } else { node = node->inside; @@ -148,8 +148,8 @@ public: } for (size_t i=node->start; i<=node->stop; i++) { - if (m_ptrs[i]->rec == rec) { - return m_ptrs[i]; + if (m_ptrs[i].ptr->rec == rec) { + return m_ptrs[i].ptr; } } @@ -175,7 +175,7 @@ public: } size_t get_memory_usage() { - return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*) + m_alloc_size; + return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*); } size_t get_aux_memory_usage() { @@ -191,8 +191,12 @@ public: } private: + struct vp_ptr { + Wrapped<R> *ptr; + double dist; + }; Wrapped<R>* m_data; - Wrapped<R>** m_ptrs; + vp_ptr* m_ptrs; std::unordered_map<R, size_t, RecordHash<R>> m_lookup_map; size_t m_reccnt; size_t m_tombstone_cnt; @@ -260,6 +264,11 @@ private: auto i = start + gsl_rng_uniform_int(rng, stop - start + 1); swap(start, i); + /* for efficiency, we'll pre-calculate the distances between each point and the root */ + for (size_t i=start+1; i<=stop; i++) { + m_ptrs[i].dist = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[i].ptr->rec); + } + /* * partition elements based on their distance from the start, * with those elements with distance falling below the median @@ -267,14 +276,15 @@ private: * the median in the right. This is easily done using QuickSelect. */ auto mid = (start + 1 + stop) / 2; - quickselect(start + 1, stop, mid, m_ptrs[start], rng); + quickselect(start + 1, stop, mid, m_ptrs[start].ptr, rng); /* Create a new node based on this partitioning */ vpnode *node = new vpnode(); node->start = start; /* store the radius of the circle used for partitioning the node. */ - node->radius = m_ptrs[start]->rec.calc_distance(m_ptrs[mid]->rec); + node->radius = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[mid].ptr->rec); + m_ptrs[start].dist = node->radius; /* recursively construct the left and right subtrees */ node->inside = build_subtree(start + 1, mid-1, rng); @@ -285,8 +295,6 @@ private: return node; } - // TODO: The quickselect code can probably be generalized and moved out - // to psudb-common instead. void quickselect(size_t start, size_t stop, size_t k, Wrapped<R> *p, gsl_rng *rng) { if (start == stop) return; @@ -303,13 +311,16 @@ private: // to psudb-common instead. size_t partition(size_t start, size_t stop, Wrapped<R> *p, gsl_rng *rng) { auto pivot = start + gsl_rng_uniform_int(rng, stop - start); - double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec); + //double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec); swap(pivot, stop); size_t j = start; for (size_t i=start; i<stop; i++) { - if (p->rec.calc_distance(m_ptrs[i]->rec) < pivot_dist) { + if (m_ptrs[i].dist < m_ptrs[stop].dist) { + //assert(distances[i - start] == p->rec.calc_distance(m_ptrs[i]->rec)); + //if (distances[i - start] < distances[stop - start]) { + //if (p->rec .calc_distance(m_ptrs[i]->rec) < pivot_dist) { swap(j, i); j++; } @@ -332,13 +343,13 @@ private: if (node->leaf) { for (size_t i=node->start; i<=node->stop; i++) { - double d = point.calc_distance(m_ptrs[i]->rec); + double d = point.calc_distance(m_ptrs[i].ptr->rec); if (d < *farthest) { if (pq.size() == k) { pq.pop(); } - pq.push(m_ptrs[i]); + pq.push(m_ptrs[i].ptr); if (pq.size() == k) { *farthest = point.calc_distance(pq.peek().data->rec); } @@ -348,14 +359,14 @@ private: return; } - double d = point.calc_distance(m_ptrs[node->start]->rec); + double d = point.calc_distance(m_ptrs[node->start].ptr->rec); if (d < *farthest) { if (pq.size() == k) { auto t = pq.peek().data->rec; pq.pop(); } - pq.push(m_ptrs[node->start]); + pq.push(m_ptrs[node->start].ptr); if (pq.size() == k) { *farthest = point.calc_distance(pq.peek().data->rec); } diff --git a/include/util/SortedMerge.h b/include/util/SortedMerge.h index 8a1e782..c149189 100644 --- a/include/util/SortedMerge.h +++ b/include/util/SortedMerge.h @@ -58,7 +58,7 @@ static std::vector<Cursor<Wrapped<R>>> build_cursor_vec(std::vector<S*> &shards, for (size_t i = 0; i < shards.size(); ++i) { if (shards[i]) { auto base = shards[i]->get_data(); - cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); + cursors.emplace_back(Cursor<Wrapped<R>>{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); *reccnt += shards[i]->get_record_count(); *tscnt += shards[i]->get_tombstone_count(); } else { diff --git a/include/util/types.h b/include/util/types.h index a13bd95..cf61412 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -19,6 +19,8 @@ #include <cstdint> #include <cstdlib> +#include <vector> +#include <cassert> namespace de { @@ -69,4 +71,79 @@ struct ShardID { /* A placeholder for an invalid shard--also used to indicate the mutable buffer */ const ShardID INVALID_SHID = {-1, -1}; +typedef ssize_t level_index; + +typedef struct ReconstructionTask { + std::vector<level_index> sources; + level_index target; + size_t reccnt; + + void add_source(level_index source, size_t cnt) { + sources.push_back(source); + reccnt += cnt; + } + +} ReconstructionTask; + +class ReconstructionVector { +public: + ReconstructionVector() + : total_reccnt(0) {} + + ~ReconstructionVector() = default; + + ReconstructionTask operator[](size_t idx) { + return m_tasks[idx]; + } + + void add_reconstruction(level_index source, level_index target, size_t reccnt) { + m_tasks.push_back({{source}, target, reccnt}); + total_reccnt += reccnt; + } + + void add_reconstruction(ReconstructionTask task) { + m_tasks.push_back(task); + } + + ReconstructionTask remove_reconstruction(size_t idx) { + assert(idx < m_tasks.size()); + auto task = m_tasks[idx]; + + m_tasks.erase(m_tasks.begin() + idx); + total_reccnt -= task.reccnt; + + return task; + } + + ReconstructionTask remove_smallest_reconstruction() { + size_t min_size = m_tasks[0].reccnt; + size_t idx = 0; + for (size_t i=1; i<m_tasks.size(); i++) { + if (m_tasks[i].reccnt < min_size) { + min_size = m_tasks[i].reccnt; + idx = i; + } + } + + auto task = m_tasks[idx]; + m_tasks.erase(m_tasks.begin() + idx); + total_reccnt -= task.reccnt; + + return task; + } + + size_t get_total_reccnt() { + return total_reccnt; + } + + size_t size() { + return m_tasks.size(); + } + + +private: + std::vector<ReconstructionTask> m_tasks; + size_t total_reccnt; +}; + } |