From 90194cb5be5c7b80bfa21a353a75920e932d54d8 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 12 Feb 2024 10:39:52 -0500 Subject: Added structure state vector w/ scratch version for reconstruction This approach should allow us to "simulate" a reconstruction to monitor the future state of the structure. The idea being that we can then add pre-emptive reconstructions to load balance and further smooth the tail latency curve. If a given reconstruction is significantly smaller than the next one will be, we can move some of the next one's work preemptively into the current one. The next phase is to do the simulation within the scratch_vector and then do a second pass examining the state of that reconstruction. In principle, we could look arbitrarily far ahead using this technique. --- include/framework/structure/ExtensionStructure.h | 144 ++++++++++++++--------- 1 file changed, 89 insertions(+), 55 deletions(-) (limited to 'include') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 4802bc1..823a66b 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -27,6 +27,16 @@ class ExtensionStructure { typedef S Shard; typedef BufferView BuffView; + typedef struct { + size_t reccnt; + size_t reccap; + + size_t shardcnt; + size_t shardcap; + } level_state; + + typedef std::vector state_vector; + public: ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) : m_scale_factor(scale_factor) @@ -59,6 +69,7 @@ public: } new_struct->m_refcnt = 0; + new_struct->m_current_state = m_current_state; return new_struct; } @@ -95,8 +106,8 @@ public: * takes a structure as input. */ inline bool flush_buffer(BuffView buffer) { - assert(can_reconstruct_with(0, buffer.get_record_count())); - + state_vector tmp = m_current_state; + assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); flush_buffer_into_l0(std::move(buffer)); return true; @@ -200,8 +211,16 @@ public: return m_levels; } + /* + * NOTE: This cannot be simulated, because tombstone cancellation is not + * cheaply predictable. It is possible that the worst case number could + * be used instead, to allow for prediction, but compaction isn't a + * major concern outside of sampling; at least for now. So I'm not + * going to focus too much time on it at the moment. + */ std::vector get_compaction_tasks() { std::vector tasks; + state_vector scratch_state = m_current_state; /* if the tombstone/delete invariant is satisfied, no need for compactions */ if (validate_tombstone_proportion()) { @@ -219,9 +238,9 @@ public: assert(violation_idx != -1); - level_index base_level = find_reconstruction_target(violation_idx); + level_index base_level = find_reconstruction_target(violation_idx, scratch_state); if (base_level == -1) { - base_level = grow(); + base_level = grow(scratch_state); } for (level_index i=base_level; i>0; i--) { @@ -239,7 +258,7 @@ public: */ size_t reccnt = m_levels[i - 1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt, scratch_state)) { reccnt += m_levels[i]->get_record_count(); } } @@ -254,46 +273,28 @@ public: /* * */ - std::vector get_reconstruction_tasks(size_t buffer_reccnt) { - std::vector reconstructions; + std::vector get_reconstruction_tasks(size_t buffer_reccnt, + state_vector scratch_state={}) { + /* + * If no scratch state vector is provided, use a copy of the + * current one. The only time an empty vector could be used as + * *real* input to this function is when the current state is also + * empty, so this should would even in that case. + */ + if (scratch_state.size() == 0) { + scratch_state = m_current_state; + } /* * The buffer flush is not included so if that can be done without any * other change, just return an empty list. */ - if (can_reconstruct_with(0, buffer_reccnt)) { + std::vector reconstructions; + if (can_reconstruct_with(0, buffer_reccnt, scratch_state)) { return std::move(reconstructions); } - level_index base_level = find_reconstruction_target(0); - if (base_level == -1) { - base_level = grow(); - } - - for (level_index i=base_level; i>0; i--) { - ReconstructionTask task = {i-1, i}; - - /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. - */ - size_t reccnt = m_levels[i-1]->get_record_count(); - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { - reccnt += m_levels[i]->get_record_count(); - } - } - //task.m_size = 2* reccnt * sizeof(R); - - reconstructions.push_back(task); - } - + reconstructions = get_reconstruction_tasks_from_level(0, scratch_state); return std::move(reconstructions); } @@ -301,12 +302,12 @@ public: /* * */ - std::vector get_reconstruction_tasks_from_level(level_index source_level) { + std::vector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) { std::vector reconstructions; - level_index base_level = find_reconstruction_target(source_level); + level_index base_level = find_reconstruction_target(source_level, scratch_state); if (base_level == -1) { - base_level = grow(); + base_level = grow(scratch_state); } for (level_index i=base_level; i>source_level; i--) { @@ -323,7 +324,7 @@ public: */ size_t reccnt = m_levels[i-1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt)) { + if (can_reconstruct_with(i, reccnt, scratch_state)) { reccnt += m_levels[i]->get_record_count(); } } @@ -332,7 +333,7 @@ public: reconstructions.push_back(task); } - return reconstructions; + return std::move(reconstructions); } /* @@ -342,7 +343,10 @@ public: * tombstone ordering invariant may be violated. */ inline void reconstruction(level_index base_level, level_index incoming_level) { + + size_t shard_capacity; if constexpr (L == LayoutPolicy::LEVELING) { + shard_capacity = 1; /* if the base level has a shard, merge the base and incoming together to make a new one */ if (m_levels[base_level]->get_shard_count() > 0) { m_levels[base_level] = InternalLevel::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get()); @@ -350,13 +354,23 @@ public: } else { m_levels[base_level] = m_levels[incoming_level]; } + } else { + shard_capacity = m_scale_factor; m_levels[base_level]->append_level(m_levels[incoming_level].get()); m_levels[base_level]->finalize(); } /* place a new, empty level where the incoming level used to be */ m_levels[incoming_level] = std::shared_ptr>(new InternalLevel(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + + /* + * Update the state vector to match the *real* stage following + * the reconstruction + */ + m_current_state[base_level] = {m_levels[base_level]->get_record_count(), + calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity}; + m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; } bool take_reference() { @@ -393,14 +407,26 @@ private: std::vector>> m_levels; + /* + * A pair of for each level in the + * structure. Record counts may be slightly inaccurate due to + * deletes. + */ + state_vector m_current_state; + /* * Add a new level to the structure and return its index. */ - inline level_index grow() { + inline level_index grow(state_vector &scratch_state) { level_index new_idx = m_levels.size(); - size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + m_levels.emplace_back(std::shared_ptr>(new InternalLevel(new_idx, new_shard_cap))); - m_levels.emplace_back(std::shared_ptr>(new InternalLevel(new_idx, new_shard_cnt))); + m_current_state.push_back({0, calc_level_record_capacity(new_idx), + 0, new_shard_cap}); + scratch_state.push_back({0, calc_level_record_capacity(new_idx), + 0, new_shard_cap}); return new_idx; } @@ -411,17 +437,21 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first buffer flush. */ - inline level_index find_reconstruction_target(level_index idx) { + inline level_index find_reconstruction_target(level_index idx, state_vector &state) { - if (idx == 0 && m_levels.size() == 0) return -1; + /* + * this handles the very first buffer flush, when the state vector + * is empty. + */ + if (idx == 0 && state.size() == 0) return -1; - size_t incoming_rec_cnt = get_level_record_count(idx); - for (level_index i=idx+1; iappend_buffer(std::move(buffer)); } + + /* update the state vector */ + m_current_state[0].reccnt = m_levels[0]->get_record_count(); + m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); } /* @@ -475,15 +509,15 @@ private: * Determines if a level can sustain a reconstruction with incoming_rec_cnt * additional records without exceeding its capacity. */ - inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) { - if (idx >= m_levels.size() || !m_levels[idx]) { + inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) { + if (idx >= state.size()) { return false; } if (L == LayoutPolicy::LEVELING) { - return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); + return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; } else { - return m_levels[idx]->get_shard_count() < m_scale_factor; + return state[idx].shardcnt < state[idx].shardcap; } /* unreachable */ -- cgit v1.2.3 From 5c229093f9af21514b17cf778dbec7ac657e31d2 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 12 Feb 2024 11:42:18 -0500 Subject: Refactored Reconstruction Tasks Added a ReconVector type to make it easier to do load balancing by shifting tasks around, and clean up a few interfaces. --- include/framework/DynamicExtension.h | 2 +- include/framework/scheduling/Task.h | 2 +- include/framework/structure/ExtensionStructure.h | 23 +++----- include/framework/util/Configuration.h | 3 -- include/util/types.h | 67 ++++++++++++++++++++++++ 5 files changed, 77 insertions(+), 20 deletions(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 7ea5370..3e1ce50 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -488,7 +488,7 @@ private: Structure *vers = args->epoch->get_structure(); for (ssize_t i=0; imerges.size(); i++) { - vers->reconstruction(args->merges[i].second, args->merges[i].first); + vers->reconstruction(args->merges[i].target, args->merges[i].source); } /* diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index d5d4266..bd53090 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -26,7 +26,7 @@ namespace de { template S, QueryInterface Q, LayoutPolicy L> struct ReconstructionArgs { Epoch *epoch; - std::vector merges; + ReconstructionVector merges; std::promise result; bool compaction; void *extension; diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 823a66b..f1c7535 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -218,8 +218,8 @@ public: * major concern outside of sampling; at least for now. So I'm not * going to focus too much time on it at the moment. */ - std::vector get_compaction_tasks() { - std::vector tasks; + ReconstructionVector get_compaction_tasks() { + ReconstructionVector tasks; state_vector scratch_state = m_current_state; /* if the tombstone/delete invariant is satisfied, no need for compactions */ @@ -244,8 +244,6 @@ public: } for (level_index i=base_level; i>0; i--) { - ReconstructionTask task = {i-1, i}; - /* * The amount of storage required for the reconstruction accounts * for the cost of storing the new records, along with the @@ -262,9 +260,7 @@ public: reccnt += m_levels[i]->get_record_count(); } } - //task.m_size = 2* reccnt * sizeof(R); - - tasks.push_back(task); + tasks.add_reconstruction(i-i, i, reccnt); } return tasks; @@ -273,7 +269,7 @@ public: /* * */ - std::vector get_reconstruction_tasks(size_t buffer_reccnt, + ReconstructionVector get_reconstruction_tasks(size_t buffer_reccnt, state_vector scratch_state={}) { /* * If no scratch state vector is provided, use a copy of the @@ -289,7 +285,7 @@ public: * The buffer flush is not included so if that can be done without any * other change, just return an empty list. */ - std::vector reconstructions; + ReconstructionVector reconstructions; if (can_reconstruct_with(0, buffer_reccnt, scratch_state)) { return std::move(reconstructions); } @@ -302,8 +298,8 @@ public: /* * */ - std::vector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) { - std::vector reconstructions; + ReconstructionVector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) { + ReconstructionVector reconstructions; level_index base_level = find_reconstruction_target(source_level, scratch_state); if (base_level == -1) { @@ -311,7 +307,6 @@ public: } for (level_index i=base_level; i>source_level; i--) { - ReconstructionTask task = {i - 1, i}; /* * The amount of storage required for the reconstruction accounts * for the cost of storing the new records, along with the @@ -328,9 +323,7 @@ public: reccnt += m_levels[i]->get_record_count(); } } -// task.m_size = 2* reccnt * sizeof(R); - - reconstructions.push_back(task); + reconstructions.add_reconstruction(i-1, i, reccnt); } return std::move(reconstructions); diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 65ca181..55cc682 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -43,7 +43,4 @@ enum class DeletePolicy { TAGGING }; -typedef ssize_t level_index; -typedef std::pair ReconstructionTask; - } diff --git a/include/util/types.h b/include/util/types.h index a13bd95..e22fd18 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -19,6 +19,8 @@ #include #include +#include +#include namespace de { @@ -69,4 +71,69 @@ struct ShardID { /* A placeholder for an invalid shard--also used to indicate the mutable buffer */ const ShardID INVALID_SHID = {-1, -1}; +typedef ssize_t level_index; + +typedef struct { + level_index source; + level_index target; + size_t reccnt; +} ReconstructionTask; + +class ReconstructionVector { +public: + ReconstructionVector() + : total_reccnt(0) {} + + ~ReconstructionVector() = default; + + ReconstructionTask operator[](size_t idx) { + return m_tasks[idx]; + } + + void add_reconstruction(level_index source, level_index target, size_t reccnt) { + m_tasks.push_back({source, target, reccnt}); + total_reccnt += reccnt; + } + + ReconstructionTask remove_reconstruction(size_t idx) { + assert(idx < m_tasks.size()); + auto task = m_tasks[idx]; + + m_tasks.erase(m_tasks.begin() + idx); + total_reccnt -= task.reccnt; + + return task; + } + + ReconstructionTask remove_smallest_reconstruction() { + size_t min_size = m_tasks[0].reccnt; + size_t idx = 0; + for (size_t i=1; i m_tasks; + size_t total_reccnt; +}; + } -- cgit v1.2.3 From 61f8bd422aed3301850c4502e9f5b7f33dff2e12 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 12 Feb 2024 12:38:29 -0500 Subject: MutableBuffer: Allow hwm to equal lwm The high watermark and low watermark can now be equal, to allow for blocking reconstruction without requiring odd buffer sizes. --- include/framework/structure/MutableBuffer.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include') diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 415c95a..c0c87cc 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -57,7 +57,7 @@ public: , m_active_head_advance(false) { assert(m_cap > m_hwm); - assert(m_hwm > m_lwm); + assert(m_hwm >= m_lwm); } ~MutableBuffer() { -- cgit v1.2.3 From 6d7067b4cef51e92d8cb54bb502d6133269728a9 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 12 Feb 2024 12:52:12 -0500 Subject: ExtensionStructure: Added simulated reconstruction lookahead The reconstruction task procedure can now simulate future reconstructions to a specified depth. --- include/framework/structure/ExtensionStructure.h | 116 +++++++++++++++++------ 1 file changed, 88 insertions(+), 28 deletions(-) (limited to 'include') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index f1c7535..0e2118e 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -107,6 +107,11 @@ public: */ inline bool flush_buffer(BuffView buffer) { state_vector tmp = m_current_state; + + if (tmp.size() == 0) { + grow(tmp); + } + assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); flush_buffer_into_l0(std::move(buffer)); @@ -281,16 +286,34 @@ public: scratch_state = m_current_state; } - /* - * The buffer flush is not included so if that can be done without any - * other change, just return an empty list. - */ ReconstructionVector reconstructions; - if (can_reconstruct_with(0, buffer_reccnt, scratch_state)) { - return std::move(reconstructions); + size_t LOOKAHEAD = 2; + for (size_t i=0; isource_level; i--) { + size_t recon_reccnt = scratch_state[i-1].reccnt; + size_t base_reccnt = recon_reccnt; + /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. + * If using Leveling, the total reconstruction size will be the + * records in *both* base and target, because they will need to + * be merged (assuming that target isn't empty). */ - size_t reccnt = m_levels[i-1]->get_record_count(); if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt, scratch_state)) { - reccnt += m_levels[i]->get_record_count(); + if (can_reconstruct_with(i, base_reccnt, scratch_state)) { + recon_reccnt += scratch_state[i].reccnt; } } - reconstructions.add_reconstruction(i-1, i, reccnt); + reconstructions.add_reconstruction(i-1, i, recon_reccnt); + + /* + * The base level will be emptied and its records moved to + * the target. + */ + scratch_state[i-1].reccnt = 0; + scratch_state[i-1].shardcnt = 0; + + /* + * The target level will have the records from the base level + * added to it, and potentially gain a shard if the LayoutPolicy + * is tiering or the level currently lacks any shards at all. + */ + scratch_state[i].reccnt += base_reccnt; + if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) { + scratch_state[i].shardcnt += 1; + } } return std::move(reconstructions); @@ -336,10 +382,16 @@ public: * tombstone ordering invariant may be violated. */ inline void reconstruction(level_index base_level, level_index incoming_level) { + size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + if (base_level >= m_levels.size()) { + m_levels.emplace_back(std::shared_ptr>(new InternalLevel(base_level, shard_capacity))); + + m_current_state.push_back({0, calc_level_record_capacity(base_level), + 0, shard_capacity}); + } - size_t shard_capacity; if constexpr (L == LayoutPolicy::LEVELING) { - shard_capacity = 1; /* if the base level has a shard, merge the base and incoming together to make a new one */ if (m_levels[base_level]->get_shard_count() > 0) { m_levels[base_level] = InternalLevel::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get()); @@ -349,7 +401,6 @@ public: } } else { - shard_capacity = m_scale_factor; m_levels[base_level]->append_level(m_levels[incoming_level].get()); m_levels[base_level]->finalize(); } @@ -408,16 +459,17 @@ private: state_vector m_current_state; /* - * Add a new level to the structure and return its index. + * Add a new level to the scratch state and return its index. + * + * IMPORTANT: This does _not_ add a level to the extension structure + * anymore. This is handled by the appropriate reconstruction and flush + * methods as needed. This function is for use in "simulated" + * reconstructions. */ inline level_index grow(state_vector &scratch_state) { level_index new_idx = m_levels.size(); size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - m_levels.emplace_back(std::shared_ptr>(new InternalLevel(new_idx, new_shard_cap))); - - m_current_state.push_back({0, calc_level_record_capacity(new_idx), - 0, new_shard_cap}); scratch_state.push_back({0, calc_level_record_capacity(new_idx), 0, new_shard_cap}); return new_idx; @@ -451,7 +503,15 @@ private: } inline void flush_buffer_into_l0(BuffView buffer) { - assert(m_levels[0]); + size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + if (m_levels.size() == 0) { + m_levels.emplace_back(std::shared_ptr>(new InternalLevel(0, shard_capacity))); + + m_current_state.push_back({0, calc_level_record_capacity(0), + 0, shard_capacity}); + } + if constexpr (L == LayoutPolicy::LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0].get(); -- cgit v1.2.3 From e4644fec1acc429a540f358b4e7e15af75aa71a9 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 12 Feb 2024 13:09:10 -0500 Subject: ExtensionStructure: first basic test of lookahead task stealing --- include/framework/structure/ExtensionStructure.h | 11 ++++++++++- include/util/types.h | 4 ++++ 2 files changed, 14 insertions(+), 1 deletion(-) (limited to 'include') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 0e2118e..dbfb543 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -275,7 +275,7 @@ public: * */ ReconstructionVector get_reconstruction_tasks(size_t buffer_reccnt, - state_vector scratch_state={}) { + state_vector scratch_state={}) { /* * If no scratch state vector is provided, use a copy of the * current one. The only time an empty vector could be used as @@ -304,6 +304,14 @@ public: */ if (i == 0) { reconstructions = local_recon; + /* + * Quick sanity test of idea: if the next reconstruction + * would be larger than this one, steal the largest + * task from it and run it now instead. + */ + } else if (local_recon.get_total_reccnt() > reconstructions.get_total_reccnt()) { + auto t = local_recon.remove_reconstruction(0); + reconstructions.add_reconstruction(t); } } @@ -312,6 +320,7 @@ public: if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) { scratch_state[0].shardcnt += 1; } + } return std::move(reconstructions); diff --git a/include/util/types.h b/include/util/types.h index e22fd18..bac0246 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -95,6 +95,10 @@ public: total_reccnt += reccnt; } + void add_reconstruction(ReconstructionTask task) { + m_tasks.push_back(task); + } + ReconstructionTask remove_reconstruction(size_t idx) { assert(idx < m_tasks.size()); auto task = m_tasks[idx]; -- cgit v1.2.3 From 49bceabf90d114b89638659141d54083c1b7f395 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Wed, 21 Feb 2024 17:03:08 -0500 Subject: VPTree: precalculate distances to make construction more efficient --- include/shard/VPTree.h | 49 ++++++++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 19 deletions(-) (limited to 'include') diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h index b342fe6..62857ce 100644 --- a/include/shard/VPTree.h +++ b/include/shard/VPTree.h @@ -58,7 +58,7 @@ public: sizeof(Wrapped), (byte**) &m_data); - m_ptrs = new Wrapped*[buffer.get_record_count()]; + m_ptrs = new vp_ptr[buffer.get_record_count()]; size_t offset = 0; m_reccnt = 0; @@ -76,7 +76,7 @@ public: rec->header &= 3; m_data[m_reccnt] = *rec; - m_ptrs[m_reccnt] = &m_data[m_reccnt]; + m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; m_reccnt++; } @@ -97,7 +97,7 @@ public: m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **) &m_data); - m_ptrs = new Wrapped*[attemp_reccnt]; + m_ptrs = new vp_ptr[attemp_reccnt]; // FIXME: will eventually need to figure out tombstones // this one will likely require the multi-pass @@ -110,7 +110,7 @@ public: } m_data[m_reccnt] = *shards[i]->get_record_at(j); - m_ptrs[m_reccnt] = &m_data[m_reccnt]; + m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; m_reccnt++; } } @@ -139,8 +139,8 @@ public: } else { vpnode *node = m_root; - while (!node->leaf && m_ptrs[node->start]->rec != rec) { - if (rec.calc_distance((m_ptrs[node->start]->rec)) >= node->radius) { + while (!node->leaf && m_ptrs[node->start].ptr->rec != rec) { + if (rec.calc_distance((m_ptrs[node->start].ptr->rec)) >= node->radius) { node = node->outside; } else { node = node->inside; @@ -148,8 +148,8 @@ public: } for (size_t i=node->start; i<=node->stop; i++) { - if (m_ptrs[i]->rec == rec) { - return m_ptrs[i]; + if (m_ptrs[i].ptr->rec == rec) { + return m_ptrs[i].ptr; } } @@ -191,8 +191,12 @@ public: } private: + struct vp_ptr { + Wrapped *ptr; + double dist; + }; Wrapped* m_data; - Wrapped** m_ptrs; + vp_ptr* m_ptrs; std::unordered_map> m_lookup_map; size_t m_reccnt; size_t m_tombstone_cnt; @@ -260,6 +264,11 @@ private: auto i = start + gsl_rng_uniform_int(rng, stop - start + 1); swap(start, i); + /* for efficiency, we'll pre-calculate the distances between each point and the root */ + for (size_t i=start+1; i<=stop; i++) { + m_ptrs[i].dist = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[i].ptr->rec); + } + /* * partition elements based on their distance from the start, * with those elements with distance falling below the median @@ -267,14 +276,15 @@ private: * the median in the right. This is easily done using QuickSelect. */ auto mid = (start + 1 + stop) / 2; - quickselect(start + 1, stop, mid, m_ptrs[start], rng); + quickselect(start + 1, stop, mid, m_ptrs[start].ptr, rng); /* Create a new node based on this partitioning */ vpnode *node = new vpnode(); node->start = start; /* store the radius of the circle used for partitioning the node. */ - node->radius = m_ptrs[start]->rec.calc_distance(m_ptrs[mid]->rec); + node->radius = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[mid].ptr->rec); + m_ptrs[start].dist = node->radius; /* recursively construct the left and right subtrees */ node->inside = build_subtree(start + 1, mid-1, rng); @@ -285,8 +295,6 @@ private: return node; } - // TODO: The quickselect code can probably be generalized and moved out - // to psudb-common instead. void quickselect(size_t start, size_t stop, size_t k, Wrapped *p, gsl_rng *rng) { if (start == stop) return; @@ -303,13 +311,16 @@ private: // to psudb-common instead. size_t partition(size_t start, size_t stop, Wrapped *p, gsl_rng *rng) { auto pivot = start + gsl_rng_uniform_int(rng, stop - start); - double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec); + //double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec); swap(pivot, stop); size_t j = start; for (size_t i=start; irec.calc_distance(m_ptrs[i]->rec) < pivot_dist) { + if (m_ptrs[i].dist < m_ptrs[stop].dist) { + //assert(distances[i - start] == p->rec.calc_distance(m_ptrs[i]->rec)); + //if (distances[i - start] < distances[stop - start]) { + //if (p->rec .calc_distance(m_ptrs[i]->rec) < pivot_dist) { swap(j, i); j++; } @@ -332,13 +343,13 @@ private: if (node->leaf) { for (size_t i=node->start; i<=node->stop; i++) { - double d = point.calc_distance(m_ptrs[i]->rec); + double d = point.calc_distance(m_ptrs[i].ptr->rec); if (d < *farthest) { if (pq.size() == k) { pq.pop(); } - pq.push(m_ptrs[i]); + pq.push(m_ptrs[i].ptr); if (pq.size() == k) { *farthest = point.calc_distance(pq.peek().data->rec); } @@ -348,14 +359,14 @@ private: return; } - double d = point.calc_distance(m_ptrs[node->start]->rec); + double d = point.calc_distance(m_ptrs[node->start].ptr->rec); if (d < *farthest) { if (pq.size() == k) { auto t = pq.peek().data->rec; pq.pop(); } - pq.push(m_ptrs[node->start]); + pq.push(m_ptrs[node->start].ptr); if (pq.size() == k) { *farthest = point.calc_distance(pq.peek().data->rec); } -- cgit v1.2.3 From 405bf4a20b4a22a6bb4b60b730b6a7e901fdccf6 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 19 Mar 2024 11:10:02 -0400 Subject: FST Shard w/ tests Needs some debugging--some methods currently fail within the library itself. The build system doesn't currently build the FST library. To compile, you'll first need to manually build it, and then place the libFST.so file in your LIBRARY_PATH and LD_LIBRARY_PATH. --- include/shard/FSTrie.h | 266 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 266 insertions(+) create mode 100644 include/shard/FSTrie.h (limited to 'include') diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h new file mode 100644 index 0000000..11a232d --- /dev/null +++ b/include/shard/FSTrie.h @@ -0,0 +1,266 @@ +/* + * include/shard/FSTrie.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * + * Distributed under the Modified BSD License. + * + * A shard shim around the FSTrie learned index. + * + * TODO: The code in this file is very poorly commented. + */ +#pragma once + + +#include + +#include "framework/ShardRequirements.h" +#include "FST.hpp" +#include "psu-ds/BloomFilter.h" +#include "util/bf_config.h" +#include "util/SortedMerge.h" + +using psudb::CACHELINE_SIZE; +using psudb::BloomFilter; +using psudb::PriorityQueue; +using psudb::queue_record; +using psudb::byte; + +namespace de { + +template +class FSTrie { +private: + typedef decltype(R::key) K; + typedef decltype(R::value) V; + + static_assert(std::is_same_v || std::is_same_v, + "FST requires either string or uint64_t keys"); + +public: + FSTrie(BufferView buffer) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + buffer.get_record_count() * + sizeof(Wrapped), + (byte**) &m_data); + size_t cnt = 0; + std::vector keys; + keys.reserve(buffer.get_record_count()); + + std::vector values; + values.reserve(buffer.get_record_count()); + + size_t longest_key = 0; + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + buffer.get_record_count(), + sizeof(Wrapped)); + buffer.copy_to_buffer((byte *) temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less>()); + + for (size_t i=0; i) { + if (m_data[cnt].rec.key.size() > longest_key) { + longest_key = m_data[cnt].rec.key.size(); + } + } + + cnt++; + } + + m_reccnt = cnt; + m_fst = FST(); + if constexpr (std::is_same_v) { + m_fst.load(keys, values, longest_key); + } else { + m_fst.load(keys, values); + } + + free(temp_buffer); + } + + FSTrie(std::vector &shards) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); + + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + attemp_reccnt * sizeof(Wrapped), + (byte **) &m_data); + + std::vector keys; + keys.reserve(attemp_reccnt); + + std::vector values; + values.reserve(attemp_reccnt); + + size_t longest_key = 0; + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + m_data[m_reccnt] = *cursor.ptr; + keys.push_back(m_data[m_reccnt].rec.key); + values.push_back(m_data[m_reccnt].rec.value); + + if constexpr (std::is_same_v) { + if (m_data[m_reccnt].rec.key.size() > longest_key) { + longest_key = m_data[m_reccnt].rec.key.size(); + } + } + + m_reccnt++; + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + if (m_reccnt > 0) { + m_fst = FST(); + if constexpr (std::is_same_v) { + m_fst.load(keys, values, longest_key); + } else { + m_fst.load(keys, values); + } + } + } + + ~FSTrie() { + free(m_data); + } + + Wrapped *point_lookup(const R &rec, bool filter=false) { + size_t idx; + bool res; + if constexpr (std::is_same_v) { + res = m_fst.lookup(rec.key.c_str(), rec.key.size(), idx); + } else { + res = m_fst.lookup(rec.key, idx); + } + + if (res) { + return m_data + idx; + } + + return nullptr; + } + + Wrapped* get_data() const { + return m_data; + } + + size_t get_record_count() const { + return m_reccnt; + } + + size_t get_tombstone_count() const { + return 0; + } + + const Wrapped* get_record_at(size_t idx) const { + if (idx >= m_reccnt) return nullptr; + return m_data + idx; + } + + + size_t get_memory_usage() { + return m_fst.mem() + m_alloc_size; + } + + size_t get_aux_memory_usage() { + return 0; + } + + size_t get_lower_bound(const K& key) { + auto itr = FSTIter(); + + const K temp_key = key; + + bool res; + if constexpr (std::is_same_v) { + res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr); + } else { + res = m_fst.lowerBound(temp_key, itr); + } + + return itr.value(); + } + + size_t get_upper_bound(const K& key) { + auto itr = FSTIter(); + + const K temp_key = key; + + bool res; + if constexpr (std::is_same_v) { + res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr); + } else { + res = m_fst.lowerBound(temp_key, itr); + } + + size_t idx = itr.value(); + while (idx < m_reccnt && m_data[idx].rec.key <= key) { + idx++; + } + + return idx; + } + +private: + + Wrapped* m_data; + size_t m_reccnt; + size_t m_alloc_size; + FST m_fst; +}; +} -- cgit v1.2.3 From 9fe190f5d500e22b0894095e7c917f9c652e0a64 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 20 Mar 2024 17:30:14 -0400 Subject: Updates/progress towards succinct trie support --- include/framework/interface/Record.h | 12 +++++++-- include/framework/structure/MutableBuffer.h | 5 ++-- include/shard/FSTrie.h | 39 +++++++++++++++++++++-------- 3 files changed, 41 insertions(+), 15 deletions(-) (limited to 'include') diff --git a/include/framework/interface/Record.h b/include/framework/interface/Record.h index 5b9f307..f4105c6 100644 --- a/include/framework/interface/Record.h +++ b/include/framework/interface/Record.h @@ -97,9 +97,17 @@ template struct Record { K key; V value; - uint32_t header = 0; - inline bool operator<(const Record& other) const { + Record &operator=(const Record &other) { + this->key = K(); + + this->key = other.key; + this->value = other.value; + + return *this; + } + + inline bool operator<(const Record& other) const { return key < other.key || (key == other.key && value < other.value); } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index c0c87cc..1ed0200 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -50,7 +50,8 @@ public: , m_tail(0) , m_head({0, 0}) , m_old_head({high_watermark, 0}) - , m_data((Wrapped *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped))) + //, m_data((Wrapped *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped))) + , m_data(new Wrapped[m_cap]()) , m_tombstone_filter(new psudb::BloomFilter(BF_FPR, m_hwm, BF_HASH_FUNCS)) , m_tscnt(0) , m_old_tscnt(0) @@ -61,7 +62,7 @@ public: } ~MutableBuffer() { - free(m_data); + delete[] m_data; delete m_tombstone_filter; } diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index 11a232d..62aa0b7 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -43,10 +43,9 @@ public: , m_reccnt(0) , m_alloc_size(0) { - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - buffer.get_record_count() * - sizeof(Wrapped), - (byte**) &m_data); + m_data = new Wrapped[buffer.get_record_count()](); + m_alloc_size = sizeof(Wrapped) * buffer.get_record_count(); + size_t cnt = 0; std::vector keys; keys.reserve(buffer.get_record_count()); @@ -62,10 +61,15 @@ public: * apply tombstone/deleted record filtering, as well as any possible * per-record processing that is required by the shard being built. */ + /* auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped)); - buffer.copy_to_buffer((byte *) temp_buffer); + */ + auto temp_buffer = new Wrapped[buffer.get_record_count()](); + for (size_t i=0; i) { @@ -88,6 +94,10 @@ public: cnt++; } + for (size_t i=0; i) { @@ -96,7 +106,7 @@ public: m_fst.load(keys, values); } - free(temp_buffer); + delete[] temp_buffer; } FSTrie(std::vector &shards) @@ -108,9 +118,8 @@ public: size_t tombstone_count = 0; auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - attemp_reccnt * sizeof(Wrapped), - (byte **) &m_data); + m_data = new Wrapped[attemp_reccnt](); + m_alloc_size = attemp_reccnt * sizeof(Wrapped); std::vector keys; keys.reserve(attemp_reccnt); @@ -165,6 +174,10 @@ public: } } + for (size_t i=0; i 0) { m_fst = FST(); if constexpr (std::is_same_v) { @@ -176,18 +189,22 @@ public: } ~FSTrie() { - free(m_data); + delete[] m_data; } Wrapped *point_lookup(const R &rec, bool filter=false) { size_t idx; bool res; if constexpr (std::is_same_v) { - res = m_fst.lookup(rec.key.c_str(), rec.key.size(), idx); + res = m_fst.lookup((uint8_t*)rec.key.c_str(), rec.key.size(), idx); } else { res = m_fst.lookup(rec.key, idx); } + if (res && m_data[idx].rec.key != rec.key) { + fprintf(stderr, "ERROR!\n"); + } + if (res) { return m_data + idx; } -- cgit v1.2.3 From 9c4884c69486cfc78801ffe4e7cd1c581e0cdb87 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 22 Mar 2024 14:03:00 -0400 Subject: Disabled lookahead for paper revision --- include/framework/structure/ExtensionStructure.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include') diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index dbfb543..ffba1c1 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -287,7 +287,7 @@ public: } ReconstructionVector reconstructions; - size_t LOOKAHEAD = 2; + size_t LOOKAHEAD = 1; for (size_t i=0; i Date: Fri, 22 Mar 2024 14:03:49 -0400 Subject: Record.h: Removed manual constructor and adjusted wrapped header fields --- include/framework/interface/Record.h | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) (limited to 'include') diff --git a/include/framework/interface/Record.h b/include/framework/interface/Record.h index f4105c6..39880bd 100644 --- a/include/framework/interface/Record.h +++ b/include/framework/interface/Record.h @@ -54,6 +54,11 @@ concept WrappedInterface = RecordInterface && requires(R r, R s, bool b) { {r.is_deleted()} -> std::convertible_to; {r.set_tombstone(b)}; {r.is_tombstone()} -> std::convertible_to; + {r.set_timestamp()}; + {r.get_timestamp()} -> std::convertible_to; + {r.clear_timestamp()}; + {r.is_deleted()} -> std::convertible_to; + {r.set_visible()}; {r < s} -> std::convertible_to; {r == s} ->std::convertible_to; }; @@ -71,9 +76,29 @@ struct Wrapped { return header & 2; } + inline void set_visible() { + header |= 4; + } + + inline bool is_visible() const { + return header & 4; + } + + inline void set_timestamp(int ts) { + header |= (ts << 3); + } + + inline int get_timestamp() const { + return header >> 3; + } + + inline void clear_timestamp() { + header &= 7; + } + inline void set_tombstone(bool val=true) { if (val) { - header |= val; + header |= 1; } else { header &= 0; } @@ -98,15 +123,6 @@ struct Record { K key; V value; - Record &operator=(const Record &other) { - this->key = K(); - - this->key = other.key; - this->value = other.value; - - return *this; - } - inline bool operator<(const Record& other) const { return key < other.key || (key == other.key && value < other.value); } -- cgit v1.2.3 From 147f0df58e1ff4973bffb7e4628e6b2fdc20eb57 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 22 Mar 2024 14:04:40 -0400 Subject: FSTrie testing and debugging --- include/shard/FSTrie.h | 121 ++++++++----------------------------------------- 1 file changed, 20 insertions(+), 101 deletions(-) (limited to 'include') diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index 62aa0b7..aa3c9f4 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -1,13 +1,11 @@ /* * include/shard/FSTrie.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * * A shard shim around the FSTrie learned index. - * - * TODO: The code in this file is very poorly commented. */ #pragma once @@ -15,9 +13,7 @@ #include #include "framework/ShardRequirements.h" -#include "FST.hpp" -#include "psu-ds/BloomFilter.h" -#include "util/bf_config.h" +#include "fst.hpp" #include "util/SortedMerge.h" using psudb::CACHELINE_SIZE; @@ -28,14 +24,13 @@ using psudb::byte; namespace de { -template +template class FSTrie { private: + typedef decltype(R::key) K; typedef decltype(R::value) V; - - static_assert(std::is_same_v || std::is_same_v, - "FST requires either string or uint64_t keys"); + static_assert(std::is_same_v, "FST requires std::string keys."); public: FSTrie(BufferView buffer) @@ -50,22 +45,12 @@ public: std::vector keys; keys.reserve(buffer.get_record_count()); - std::vector values; - values.reserve(buffer.get_record_count()); - - size_t longest_key = 0; - /* * Copy the contents of the buffer view into a temporary buffer, and * sort them. We still need to iterate over these temporary records to * apply tombstone/deleted record filtering, as well as any possible * per-record processing that is required by the shard being built. */ - /* - auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, - buffer.get_record_count(), - sizeof(Wrapped)); - */ auto temp_buffer = new Wrapped[buffer.get_record_count()](); for (size_t i=0; i>()); for (size_t i=0; i) { - if (m_data[cnt].rec.key.size() > longest_key) { - longest_key = m_data[cnt].rec.key.size(); - } - } - cnt++; } @@ -99,11 +77,8 @@ public: } m_reccnt = cnt; - m_fst = FST(); - if constexpr (std::is_same_v) { - m_fst.load(keys, values, longest_key); - } else { - m_fst.load(keys, values); + if (m_reccnt > 0) { + m_fst = new fst::Trie(keys); } delete[] temp_buffer; @@ -124,10 +99,6 @@ public: std::vector keys; keys.reserve(attemp_reccnt); - std::vector values; - values.reserve(attemp_reccnt); - - size_t longest_key = 0; // FIXME: For smaller cursor arrays, it may be more efficient to skip // the priority queue and just do a scan. PriorityQueue> pq(cursors.size()); @@ -155,16 +126,9 @@ public: } else { auto& cursor = cursors[now.version]; /* skip over records that have been deleted via tagging */ - if (!cursor.ptr->is_deleted()) { + if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { m_data[m_reccnt] = *cursor.ptr; keys.push_back(m_data[m_reccnt].rec.key); - values.push_back(m_data[m_reccnt].rec.value); - - if constexpr (std::is_same_v) { - if (m_data[m_reccnt].rec.key.size() > longest_key) { - longest_key = m_data[m_reccnt].rec.key.size(); - } - } m_reccnt++; } @@ -179,37 +143,24 @@ public: } if (m_reccnt > 0) { - m_fst = FST(); - if constexpr (std::is_same_v) { - m_fst.load(keys, values, longest_key); - } else { - m_fst.load(keys, values); - } + m_fst = new fst::Trie(keys); } } ~FSTrie() { delete[] m_data; + delete m_fst; } Wrapped *point_lookup(const R &rec, bool filter=false) { - size_t idx; - bool res; - if constexpr (std::is_same_v) { - res = m_fst.lookup((uint8_t*)rec.key.c_str(), rec.key.size(), idx); - } else { - res = m_fst.lookup(rec.key, idx); - } - if (res && m_data[idx].rec.key != rec.key) { - fprintf(stderr, "ERROR!\n"); - } + auto idx = m_fst->exactSearch(rec.key); - if (res) { - return m_data + idx; + if (idx == fst::kNotFound) { + return nullptr; } - return nullptr; + return m_data + idx; } Wrapped* get_data() const { @@ -231,53 +182,21 @@ public: size_t get_memory_usage() { - return m_fst.mem() + m_alloc_size; + return m_fst->getMemoryUsage() + m_alloc_size; } size_t get_aux_memory_usage() { return 0; } - size_t get_lower_bound(const K& key) { - auto itr = FSTIter(); - - const K temp_key = key; - - bool res; - if constexpr (std::is_same_v) { - res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr); - } else { - res = m_fst.lowerBound(temp_key, itr); - } - - return itr.value(); - } - - size_t get_upper_bound(const K& key) { - auto itr = FSTIter(); - - const K temp_key = key; - - bool res; - if constexpr (std::is_same_v) { - res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr); - } else { - res = m_fst.lowerBound(temp_key, itr); - } - - size_t idx = itr.value(); - while (idx < m_reccnt && m_data[idx].rec.key <= key) { - idx++; - } - - return idx; - } + size_t get_lower_bound(R &rec) {return 0;} + size_t get_upper_bound(R &rec) {return 0;} private: Wrapped* m_data; size_t m_reccnt; size_t m_alloc_size; - FST m_fst; + fst::Trie *m_fst; }; } -- cgit v1.2.3 From 3b7bd57fa2af388b4de41f9b5e58e26b1306d983 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 22 Mar 2024 14:04:58 -0400 Subject: MutableBuffer: added visibility flag to records and refactored timestamp --- include/framework/structure/MutableBuffer.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'include') diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 1ed0200..7db3980 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -77,16 +77,20 @@ public: wrec.header = 0; if (tombstone) wrec.set_tombstone(); + // FIXME: because of the mod, it isn't correct to use `pos` + // as the ordering timestamp in the header anymore. size_t pos = tail % m_cap; m_data[pos] = wrec; - m_data[pos].header |= (pos << 2); + m_data[pos].set_timestamp(pos); if (tombstone) { m_tscnt.fetch_add(1); if (m_tombstone_filter) m_tombstone_filter->insert(rec); } + m_data[pos].set_visible(); + return 1; } -- cgit v1.2.3 From d0aebc685b245e51bf47cff8e28f811e43073d5e Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 22 Mar 2024 14:15:00 -0400 Subject: PGM.h: fixed an out of bounds array access on point lookup misses. --- include/shard/PGM.h | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) (limited to 'include') diff --git a/include/shard/PGM.h b/include/shard/PGM.h index e2752ef..ff9ce2d 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -147,14 +147,16 @@ public: return m_reccnt; } - // If the region to search is less than some pre-specified - // amount, perform a linear scan to locate the record. + /* + * If the region to search is less than some pre-specified + * amount, perform a linear scan to locate the record. + */ if (bound.hi - bound.lo < 256) { while (idx < bound.hi && m_data[idx].rec.key < key) { idx++; } } else { - // Otherwise, perform a binary search + /* Otherwise, perform a binary search */ idx = bound.lo; size_t max = bound.hi; @@ -169,10 +171,26 @@ public: } + /* + * the upper bound returned by PGM is one passed the end of the + * array. If we are at that point, we should just return "not found" + */ + if (idx == m_reccnt) { + return idx; + } + + /* + * We may have walked one passed the actual lower bound, so check + * the index before the current one to see if it is the actual bound + */ if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) { return idx-1; } + /* + * Otherwise, check idx. If it is a valid bound, then return it, + * otherwise return "not found". + */ return (m_data[idx].rec.key >= key) ? idx : m_reccnt; } -- cgit v1.2.3 From 0e2d10f10974def7cd86b6b3529d7ffce9285f11 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 22 Mar 2024 14:18:40 -0400 Subject: Record.h: Fixed wrapped record concept --- include/framework/interface/Record.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'include') diff --git a/include/framework/interface/Record.h b/include/framework/interface/Record.h index 39880bd..d380f9b 100644 --- a/include/framework/interface/Record.h +++ b/include/framework/interface/Record.h @@ -47,17 +47,17 @@ concept AlexInterface = KVPInterface && requires(R r) { }; template -concept WrappedInterface = RecordInterface && requires(R r, R s, bool b) { +concept WrappedInterface = RecordInterface && requires(R r, R s, bool b, int i) { {r.header} -> std::convertible_to; r.rec; {r.set_delete()}; {r.is_deleted()} -> std::convertible_to; {r.set_tombstone(b)}; {r.is_tombstone()} -> std::convertible_to; - {r.set_timestamp()}; + {r.set_timestamp(i)}; {r.get_timestamp()} -> std::convertible_to; {r.clear_timestamp()}; - {r.is_deleted()} -> std::convertible_to; + {r.is_visible()} -> std::convertible_to; {r.set_visible()}; {r < s} -> std::convertible_to; {r == s} ->std::convertible_to; -- cgit v1.2.3 From fb4312a883dd0e382ecbcfe1119479e6f44d32a6 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 22 Mar 2024 15:35:14 -0400 Subject: PointLookup: added a point lookup query for unique indexes, and some tests --- include/query/pointlookup.h | 117 ++++++++++++++++++++++++++++++++++++++++++++ include/shard/FSTrie.h | 6 +++ 2 files changed, 123 insertions(+) create mode 100644 include/query/pointlookup.h (limited to 'include') diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h new file mode 100644 index 0000000..caaa320 --- /dev/null +++ b/include/query/pointlookup.h @@ -0,0 +1,117 @@ +/* + * include/query/pointlookup.h + * + * Copyright (C) 2024 Douglas B. Rumbaugh + * + * Distributed under the Modified BSD License. + * + * A query class for point lookup operations. + * + * TODO: Currently, this only supports point lookups for unique keys (which + * is the case for the trie that we're building this to use). It would be + * pretty straightforward to extend it to return *all* records that match + * the search_key (including tombstone cancellation--it's invertible) to + * support non-unique indexes, or at least those implementing + * lower_bound(). + */ +#pragma once + +#include "framework/QueryRequirements.h" + +namespace de { namespace pl { + +template +struct Parms { + decltype(R::key) search_key; +}; + +template +struct State { +}; + +template +struct BufferState { + BufferView *buffer; + + BufferState(BufferView *buffer) + : buffer(buffer) {} +}; + +template S> +class Query { +public: + constexpr static bool EARLY_ABORT=true; + constexpr static bool SKIP_DELETE_FILTER=true; + + static void *get_query_state(S *shard, void *parms) { + return nullptr; + } + + static void* get_buffer_query_state(BufferView *buffer, void *parms) { + auto res = new BufferState(buffer); + + return res; + } + + static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_state) { + return; + } + + static std::vector> query(S *shard, void *q_state, void *parms) { + auto p = (Parms *) parms; + auto s = (State *) q_state; + + std::vector> result; + + auto r = shard->point_lookup({p->search_key, 0}); + + if (r) { + result.push_back(*r); + } + + return result; + } + + static std::vector> buffer_query(void *state, void *parms) { + auto p = (Parms *) parms; + auto s = (BufferState *) state; + + std::vector> records; + for (size_t i=0; ibuffer->get_record_count(); i++) { + auto rec = s->buffer->get(i); + + if (rec->rec.key == p->search_key) { + records.push_back(*rec); + return records; + } + } + + return records; + } + + static std::vector merge(std::vector>> &results, void *parms) { + std::vector output; + for (auto r : results) { + if (r.size() > 0) { + if (r[0].is_deleted() || r[0].is_tombstone()) { + return output; + } + + output.append(r[0].rec); + return output; + } + } + } + + static void delete_query_state(void *state) { + auto s = (State *) state; + delete s; + } + + static void delete_buffer_query_state(void *state) { + auto s = (BufferState *) state; + delete s; + } +}; + +}} diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index aa3c9f4..50bf982 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -160,6 +160,12 @@ public: return nullptr; } + // FIXME: for convenience, I'm treating this Trie as a unique index + // for now, so no need to scan forward and/or check values. This + // also makes the point lookup query class a lot easier to make. + // Ultimately, though, we can support non-unique indexes with some + // extra work. + return m_data + idx; } -- cgit v1.2.3 From b1b5ab106122e6917f6b34452be95e617506f05d Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Mon, 25 Mar 2024 12:54:17 -0400 Subject: Updates for build on OpenBSD Necessary updates to get the codebase building under OpenBSD 7.5 with clang. This is a minimal set of changes to get building to work, which includes disabling several things that aren't directly compatable. More work will be necessary to get full functionality. In particular, Triespline, PGM, and the reference M-tree do not currently build on OpenBSD with clang due to GNU dependencies or other gcc specific features. --- include/framework/DynamicExtension.h | 6 ++++++ include/framework/structure/BufferView.h | 2 +- include/query/rangequery.h | 2 +- include/util/SortedMerge.h | 2 +- 4 files changed, 9 insertions(+), 3 deletions(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 3e1ce50..44ad454 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -685,6 +685,7 @@ private: return processed_records; } +#ifdef _GNU_SOURCE void SetThreadAffinity() { int core = m_next_core.fetch_add(1) % m_core_cnt; cpu_set_t mask; @@ -707,6 +708,11 @@ private: CPU_SET(core, &mask); ::sched_setaffinity(0, sizeof(mask), &mask); } +#else + void SetThreadAffinity() { + + } +#endif void end_job(_Epoch *epoch) { diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 9e0872b..44a2044 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -20,7 +20,7 @@ namespace de { -typedef std::_Bind ReleaseFunction; +typedef std::function ReleaseFunction; template class BufferView { diff --git a/include/query/rangequery.h b/include/query/rangequery.h index 24b38ec..e6ab581 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -121,7 +121,7 @@ public: for (size_t i = 0; i < tmp_n; ++i) if (results[i].size() > 0){ auto base = results[i].data(); - cursors.emplace_back(Cursor{base, base + results[i].size(), 0, results[i].size()}); + cursors.emplace_back(Cursor>{base, base + results[i].size(), 0, results[i].size()}); assert(i == cursors.size() - 1); total += results[i].size(); pq.push(cursors[i].ptr, tmp_n - i - 1); diff --git a/include/util/SortedMerge.h b/include/util/SortedMerge.h index 8a1e782..c149189 100644 --- a/include/util/SortedMerge.h +++ b/include/util/SortedMerge.h @@ -58,7 +58,7 @@ static std::vector>> build_cursor_vec(std::vector &shards, for (size_t i = 0; i < shards.size(); ++i) { if (shards[i]) { auto base = shards[i]->get_data(); - cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); + cursors.emplace_back(Cursor>{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); *reccnt += shards[i]->get_record_count(); *tscnt += shards[i]->get_tombstone_count(); } else { -- cgit v1.2.3 From 7e7fd9f7339eee2f1ae974c662a447532dfb1b1a Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 26 Mar 2024 16:35:12 -0400 Subject: Updated FSTrie benchmark and some minor fixes --- include/query/pointlookup.h | 4 +++- include/shard/FSTrie.h | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'include') diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h index caaa320..35d38e3 100644 --- a/include/query/pointlookup.h +++ b/include/query/pointlookup.h @@ -97,10 +97,12 @@ public: return output; } - output.append(r[0].rec); + output.push_back(r[0].rec); return output; } } + + return output; } static void delete_query_state(void *state) { diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index 50bf982..95f396f 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -61,7 +61,7 @@ public: std::sort(base, stop, std::less>()); for (size_t i=0; i Date: Thu, 11 Apr 2024 12:23:21 -0400 Subject: stuff --- include/shard/ISAMTree.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include') diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index 3763271..af62c92 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -70,7 +70,7 @@ public: * without this, gcc seems to hoist the building of the array * _above_ its allocation under -O3, resulting in memfaults. */ - asm volatile ("" ::: "memory"); + //asm volatile ("" ::: "memory"); auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); m_reccnt = res.record_count; -- cgit v1.2.3 From 2c69253f382cd0c6d41db57c45119c33c315bb9c Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 15 Apr 2024 12:51:01 -0400 Subject: Missed file from last commit --- include/framework/DynamicExtension.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 44ad454..9e1b8fa 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -201,7 +201,7 @@ public: */ size_t get_memory_usage() { auto epoch = get_active_epoch(); - auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage(); + auto t = m_buffer->get_memory_usage() + epoch->get_structure()->get_memory_usage(); end_job(epoch); return t; -- cgit v1.2.3 From b25beb13773072c3b143842b45a7c32a1108f347 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 15 Apr 2024 14:00:27 -0400 Subject: Updated FSTrie to use const char * instead of std::string Note: this requires the caller to manage the memory of the strings --- include/framework/interface/Record.h | 17 +++++++++++++++++ include/shard/FSTrie.h | 18 +++++------------- 2 files changed, 22 insertions(+), 13 deletions(-) (limited to 'include') diff --git a/include/framework/interface/Record.h b/include/framework/interface/Record.h index d380f9b..19ccadd 100644 --- a/include/framework/interface/Record.h +++ b/include/framework/interface/Record.h @@ -132,6 +132,23 @@ struct Record { } }; +template +struct Record { + const char* key; + V value; + size_t len; + + inline bool operator<(const Record& other) const { + size_t n = std::min(len, other.len) + 1; + return strncmp(key, other.key, n) < 0; + } + + inline bool operator==(const Record& other) const { + size_t n = std::min(len, other.len) + 1; + return strncmp(key, other.key, n) == 0; + } +}; + template struct WeightedRecord { K key; diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index 95f396f..be678ff 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -30,7 +30,7 @@ private: typedef decltype(R::key) K; typedef decltype(R::value) V; - static_assert(std::is_same_v, "FST requires std::string keys."); + static_assert(std::is_same_v, "FST requires const char* keys."); public: FSTrie(BufferView buffer) @@ -42,7 +42,7 @@ public: m_alloc_size = sizeof(Wrapped) * buffer.get_record_count(); size_t cnt = 0; - std::vector keys; + std::vector keys; keys.reserve(buffer.get_record_count()); /* @@ -68,14 +68,10 @@ public: m_data[cnt] = temp_buffer[i]; m_data[cnt].clear_timestamp(); - keys.push_back(m_data[cnt].rec.key); + keys.push_back(std::string(m_data[cnt].rec.key)); cnt++; } - for (size_t i=0; i 0) { m_fst = new fst::Trie(keys); @@ -96,7 +92,7 @@ public: m_data = new Wrapped[attemp_reccnt](); m_alloc_size = attemp_reccnt * sizeof(Wrapped); - std::vector keys; + std::vector keys; keys.reserve(attemp_reccnt); // FIXME: For smaller cursor arrays, it may be more efficient to skip @@ -128,7 +124,7 @@ public: /* skip over records that have been deleted via tagging */ if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { m_data[m_reccnt] = *cursor.ptr; - keys.push_back(m_data[m_reccnt].rec.key); + keys.push_back(std::string(m_data[m_reccnt].rec.key)); m_reccnt++; } @@ -138,10 +134,6 @@ public: } } - for (size_t i=0; i 0) { m_fst = new fst::Trie(keys); } -- cgit v1.2.3 From 7c2f43ff039795576bc0014c367b893fbbaceca4 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 19 Apr 2024 14:39:33 -0400 Subject: Benchmark updates --- include/framework/DynamicExtension.h | 4 +++- include/framework/structure/ExtensionStructure.h | 2 ++ include/shard/Alias.h | 2 +- include/shard/AugBTree.h | 2 +- include/shard/FSTrie.h | 8 ++++---- include/shard/ISAMTree.h | 14 +++----------- include/shard/PGM.h | 6 +++--- include/shard/TrieSpline.h | 10 ++++------ include/shard/VPTree.h | 2 +- 9 files changed, 22 insertions(+), 28 deletions(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 9e1b8fa..9d54777 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -204,6 +204,8 @@ public: auto t = m_buffer->get_memory_usage() + epoch->get_structure()->get_memory_usage(); end_job(epoch); + fprintf(stderr, "total: %ld\n", t); + return t; } @@ -214,7 +216,7 @@ public: */ size_t get_aux_memory_usage() { auto epoch = get_active_epoch(); - auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); + auto t = m_buffer->get_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); end_job(epoch); return t; diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index ffba1c1..6ad7aad 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -161,9 +161,11 @@ public: size_t get_memory_usage() { size_t cnt = 0; for (size_t i=0; iget_memory_usage()); if (m_levels[i]) cnt += m_levels[i]->get_memory_usage(); } + fprintf(stderr, "%ld\n", cnt); return cnt; } diff --git a/include/shard/Alias.h b/include/shard/Alias.h index 9275952..72147d7 100644 --- a/include/shard/Alias.h +++ b/include/shard/Alias.h @@ -148,7 +148,7 @@ public: size_t get_memory_usage() { - return m_alloc_size; + return 0; } size_t get_aux_memory_usage() { diff --git a/include/shard/AugBTree.h b/include/shard/AugBTree.h index 54931bd..c60cbcd 100644 --- a/include/shard/AugBTree.h +++ b/include/shard/AugBTree.h @@ -148,7 +148,7 @@ public: } size_t get_memory_usage() { - return m_alloc_size + m_node_cnt * sizeof(AugBTreeNode>); + return m_node_cnt * sizeof(AugBTreeNode>); } size_t get_aux_memory_usage() { diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index be678ff..3783b38 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -74,7 +74,7 @@ public: m_reccnt = cnt; if (m_reccnt > 0) { - m_fst = new fst::Trie(keys); + m_fst = new fst::Trie(keys, true, 1); } delete[] temp_buffer; @@ -135,7 +135,7 @@ public: } if (m_reccnt > 0) { - m_fst = new fst::Trie(keys); + m_fst = new fst::Trie(keys, true, 1); } } @@ -180,11 +180,11 @@ public: size_t get_memory_usage() { - return m_fst->getMemoryUsage() + m_alloc_size; + return m_fst->getMemoryUsage(); } size_t get_aux_memory_usage() { - return 0; + return m_alloc_size; } size_t get_lower_bound(R &rec) {return 0;} diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index af62c92..1cca506 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -51,7 +51,7 @@ constexpr static size_t LEAF_FANOUT = NODE_SZ / sizeof(R); public: ISAMTree(BufferView buffer) - : m_bf(new BloomFilter(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + : m_bf(nullptr) , m_isam_nodes(nullptr) , m_root(nullptr) , m_reccnt(0) @@ -59,19 +59,12 @@ public: , m_internal_node_cnt(0) , m_deleted_cnt(0) , m_alloc_size(0) - , m_data(nullptr) { m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped), (byte**) &m_data); - /* - * without this, gcc seems to hoist the building of the array - * _above_ its allocation under -O3, resulting in memfaults. - */ - //asm volatile ("" ::: "memory"); - auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); m_reccnt = res.record_count; m_tombstone_cnt = res.tombstone_count; @@ -90,13 +83,12 @@ public: , m_internal_node_cnt(0) , m_deleted_cnt(0) , m_alloc_size(0) - , m_data(nullptr) { size_t attemp_reccnt = 0; size_t tombstone_count = 0; auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - m_bf = new BloomFilter(BF_FPR, tombstone_count, BF_HASH_FUNCS); + m_bf = nullptr; m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **) &m_data); @@ -149,7 +141,7 @@ public: size_t get_memory_usage() { - return m_alloc_size + m_internal_node_cnt * NODE_SZ; + return m_internal_node_cnt * NODE_SZ; } size_t get_aux_memory_usage() { diff --git a/include/shard/PGM.h b/include/shard/PGM.h index ff9ce2d..a3f9749 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -39,8 +39,7 @@ private: public: PGM(BufferView buffer) - : m_data(nullptr) - , m_bf(new BloomFilter(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + : m_bf(nullptr) , m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) { @@ -49,6 +48,7 @@ public: buffer.get_record_count() * sizeof(Wrapped), (byte**) &m_data); + auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); m_reccnt = res.record_count; m_tombstone_cnt = res.tombstone_count; @@ -132,7 +132,7 @@ public: size_t get_memory_usage() { - return m_pgm.size_in_bytes() + m_alloc_size; + return m_pgm.size_in_bytes(); } size_t get_aux_memory_usage() { diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 2a432e8..023390e 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -36,13 +36,12 @@ private: public: TrieSpline(BufferView buffer) - : m_data(nullptr) - , m_reccnt(0) + : m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) , m_max_key(0) , m_min_key(0) - , m_bf(new BloomFilter(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + , m_bf(nullptr) { m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * @@ -79,7 +78,6 @@ public: size_t tombstone_count = 0; auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - m_bf = new BloomFilter(BF_FPR, tombstone_count, BF_HASH_FUNCS); m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **) &m_data); @@ -107,7 +105,7 @@ public: } Wrapped *point_lookup(const R &rec, bool filter=false) { - if (filter && !m_bf->lookup(rec)) { + if (filter && m_bf && !m_bf->lookup(rec)) { return nullptr; } @@ -144,7 +142,7 @@ public: size_t get_memory_usage() { - return m_ts.GetSize() + m_alloc_size; + return m_ts.GetSize(); } size_t get_aux_memory_usage() { diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h index 62857ce..d5a2393 100644 --- a/include/shard/VPTree.h +++ b/include/shard/VPTree.h @@ -175,7 +175,7 @@ public: } size_t get_memory_usage() { - return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*) + m_alloc_size; + return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*); } size_t get_aux_memory_usage() { -- cgit v1.2.3 From 34fd8ad935e6359d20a5d6c949e67495d0842f8f Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 19 Apr 2024 14:40:19 -0400 Subject: More trie baseline tests --- include/shard/LoudsPatricia.h | 199 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 199 insertions(+) create mode 100644 include/shard/LoudsPatricia.h (limited to 'include') diff --git a/include/shard/LoudsPatricia.h b/include/shard/LoudsPatricia.h new file mode 100644 index 0000000..3452839 --- /dev/null +++ b/include/shard/LoudsPatricia.h @@ -0,0 +1,199 @@ +/* + * include/shard/LoudsPatricia.h + * + * Copyright (C) 2024 Douglas B. Rumbaugh + * + * Distributed under the Modified BSD License. + * + * A shard shim around the LoudsPatricia learned index. + */ +#pragma once + + +#include + +#include "framework/ShardRequirements.h" +#include "louds-patricia.hpp" +#include "util/SortedMerge.h" + +using psudb::CACHELINE_SIZE; +using psudb::BloomFilter; +using psudb::PriorityQueue; +using psudb::queue_record; +using psudb::byte; + +namespace de { + +template +class LoudsPatricia { +private: + + typedef decltype(R::key) K; + typedef decltype(R::value) V; + static_assert(std::is_same_v, "FST requires const char* keys."); + +public: + LoudsPatricia(BufferView buffer) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + m_data = new Wrapped[buffer.get_record_count()](); + m_alloc_size = sizeof(Wrapped) * buffer.get_record_count(); + + m_louds = new louds::Patricia(); + + size_t cnt = 0; + std::vector keys; + keys.reserve(buffer.get_record_count()); + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = new Wrapped[buffer.get_record_count()](); + for (size_t i=0; i>()); + + for (size_t i=0; iadd(std::string(m_data[cnt].rec.key)); + cnt++; + } + + m_reccnt = cnt; + if (m_reccnt > 0) { + m_louds->build(); + } + + delete[] temp_buffer; + } + + LoudsPatricia(std::vector &shards) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); + + m_data = new Wrapped[attemp_reccnt](); + m_alloc_size = attemp_reccnt * sizeof(Wrapped); + + m_louds = new louds::Patricia(); + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { + m_data[m_reccnt] = *cursor.ptr; + m_louds->add(std::string(m_data[m_reccnt].rec.key)); + m_reccnt++; + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + if (m_reccnt > 0) { + m_louds->build(); + } + } + + ~LoudsPatricia() { + delete[] m_data; + delete m_louds; + } + + Wrapped *point_lookup(const R &rec, bool filter=false) { + + auto idx = m_louds->lookup(std::string(rec.key)); + + if (idx == -1) { + return nullptr; + } + + // FIXME: for convenience, I'm treating this Trie as a unique index + // for now, so no need to scan forward and/or check values. This + // also makes the point lookup query class a lot easier to make. + // Ultimately, though, we can support non-unique indexes with some + // extra work. + return m_data + idx; + } + + Wrapped* get_data() const { + return m_data; + } + + size_t get_record_count() const { + return m_reccnt; + } + + size_t get_tombstone_count() const { + return 0; + } + + const Wrapped* get_record_at(size_t idx) const { + if (idx >= m_reccnt) return nullptr; + return m_data + idx; + } + + + size_t get_memory_usage() { + return m_louds->size(); + } + + size_t get_aux_memory_usage() { + return m_alloc_size; + } + + size_t get_lower_bound(R &rec) {return 0;} + size_t get_upper_bound(R &rec) {return 0;} + +private: + + Wrapped* m_data; + size_t m_reccnt; + size_t m_alloc_size; + louds::Patricia *m_louds; +}; +} -- cgit v1.2.3 From b8168b37a6dd91295903f52ee878a57f149b261d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 19 Apr 2024 15:02:44 -0400 Subject: PGM Shard: Fully disabled bloom filter --- include/shard/PGM.h | 1 - 1 file changed, 1 deletion(-) (limited to 'include') diff --git a/include/shard/PGM.h b/include/shard/PGM.h index a3f9749..691385e 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -74,7 +74,6 @@ public: size_t tombstone_count = 0; auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - m_bf = new BloomFilter(BF_FPR, tombstone_count, BF_HASH_FUNCS); m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **) &m_data); -- cgit v1.2.3 From e677947ed6f40d7847e9989d1226e1e8e0b3e03c Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 19 Apr 2024 15:03:45 -0400 Subject: Removed debug print statements --- include/framework/DynamicExtension.h | 2 -- include/framework/structure/ExtensionStructure.h | 2 -- 2 files changed, 4 deletions(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 9d54777..b154be8 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -204,8 +204,6 @@ public: auto t = m_buffer->get_memory_usage() + epoch->get_structure()->get_memory_usage(); end_job(epoch); - fprintf(stderr, "total: %ld\n", t); - return t; } diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 6ad7aad..ffba1c1 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -161,11 +161,9 @@ public: size_t get_memory_usage() { size_t cnt = 0; for (size_t i=0; iget_memory_usage()); if (m_levels[i]) cnt += m_levels[i]->get_memory_usage(); } - fprintf(stderr, "%ld\n", cnt); return cnt; } -- cgit v1.2.3 From 8479f3ce863dfb6d3b20ff4678fa6fe92ee86b52 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 19 Apr 2024 16:50:18 -0400 Subject: Fixed some benchmarking bugs --- include/query/irs.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include') diff --git a/include/query/irs.h b/include/query/irs.h index e2d9325..51eb4e2 100644 --- a/include/query/irs.h +++ b/include/query/irs.h @@ -103,7 +103,7 @@ public: weights.push_back((bs) ? bs->records.size() : 0); } - size_t total_weight = 0; + size_t total_weight = weights[0]; for (auto &s : shard_states) { auto state = (State *) s; total_weight += state->total_weight; -- cgit v1.2.3 From 438feac7e56fee425d9c6f1a43298ff9dc5b71d1 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 19 Apr 2024 17:38:16 -0400 Subject: Properly implemented support for iteratively decomposable problems --- include/framework/DynamicExtension.h | 40 +++++++++-------- include/framework/interface/Query.h | 6 ++- include/query/irs.h | 85 ++++++++++++++++++++++++------------ include/query/knn.h | 7 ++- include/query/pointlookup.h | 8 +++- include/query/rangecount.h | 8 ++-- include/query/rangequery.h | 7 ++- include/query/wirs.h | 13 ++++-- include/query/wss.h | 13 ++++-- 9 files changed, 123 insertions(+), 64 deletions(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index b154be8..6fd95c6 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -546,30 +546,34 @@ private: std::vector> shards; std::vector states = vers->get_query_states(shards, parms); + std::vector results; Q::process_query_states(parms, states, buffer_state); - std::vector>> query_results(shards.size() + 1); - for (size_t i=0; i> local_results; - ShardID shid; - - if (i == 0) { /* process the buffer first */ - local_results = Q::buffer_query(buffer_state, parms); - shid = INVALID_SHID; - } else { - local_results = Q::query(shards[i - 1].second, states[i - 1], parms); - shid = shards[i - 1].first; - } + do { + std::vector>> query_results(shards.size() + 1); + for (size_t i=0; i> local_results; + ShardID shid; + + if (i == 0) { /* process the buffer first */ + local_results = Q::buffer_query(buffer_state, parms); + shid = INVALID_SHID; + } else { + local_results = Q::query(shards[i - 1].second, states[i - 1], parms); + shid = shards[i - 1].first; + } - query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); + query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); - if constexpr (Q::EARLY_ABORT) { - if (query_results[i].size() > 0) break; + if constexpr (Q::EARLY_ABORT) { + if (query_results[i].size() > 0) break; + } } - } + Q::merge(query_results, parms, results); + + } while (Q::repeat(parms, results, states, buffer_state)); - auto result = Q::merge(query_results, parms); - args->result_set.set_value(std::move(result)); + args->result_set.set_value(std::move(results)); ((DynamicExtension *) args->extension)->end_job(epoch); diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h index 3d487f0..577d6cd 100644 --- a/include/framework/interface/Query.h +++ b/include/framework/interface/Query.h @@ -13,17 +13,19 @@ namespace de{ template -concept QueryInterface = requires(void *p, S *sh, std::vector &s, std::vector>> &rv, BufferView *bv) { +concept QueryInterface = requires(void *p, S *sh, std::vector &s, std::vector>> &rv, BufferView *bv, std::vector &resv) { {Q::get_query_state(sh, p)} -> std::convertible_to; {Q::get_buffer_query_state(bv, p)} -> std::convertible_to; {Q::process_query_states(p, s, p)}; {Q::query(sh, p, p)} -> std::convertible_to>>; {Q::buffer_query(p, p)} -> std::convertible_to>>; - {Q::merge(rv, p)} -> std::convertible_to>; + {Q::merge(rv, p, resv)}; {Q::delete_query_state(p)} -> std::same_as; {Q::delete_buffer_query_state(p)} -> std::same_as; + {Q::repeat(p, resv, s, p)} -> std::same_as; + {Q::EARLY_ABORT} -> std::convertible_to; {Q::SKIP_DELETE_FILTER} -> std::convertible_to; }; diff --git a/include/query/irs.h b/include/query/irs.h index 51eb4e2..879d070 100644 --- a/include/query/irs.h +++ b/include/query/irs.h @@ -40,7 +40,12 @@ struct BufferState { size_t sample_size; BufferView *buffer; + psudb::Alias *alias; + BufferState(BufferView *buffer) : buffer(buffer) {} + ~BufferState() { + delete alias; + } }; template S, bool Rejection=true> @@ -72,6 +77,7 @@ public: res->cutoff = res->buffer->get_record_count(); res->sample_size = 0; + res->alias = nullptr; if constexpr (Rejection) { return res; @@ -96,39 +102,51 @@ public: std::vector shard_sample_sizes(shard_states.size()+1, 0); size_t buffer_sz = 0; - std::vector weights; - if constexpr (Rejection) { - weights.push_back((bs) ? bs->cutoff : 0); - } else { - weights.push_back((bs) ? bs->records.size() : 0); + /* for simplicity of static structure testing */ + if (!bs) { + assert(shard_states.size() == 1); + auto state = (State *) shard_states[0]; + state->sample_size = p->sample_size; + return; } - size_t total_weight = weights[0]; - for (auto &s : shard_states) { - auto state = (State *) s; - total_weight += state->total_weight; - weights.push_back(state->total_weight); - } + /* we only need to build the shard alias on the first call */ + if (bs->alias == nullptr) { + std::vector weights; + if constexpr (Rejection) { + weights.push_back((bs) ? bs->cutoff : 0); + } else { + weights.push_back((bs) ? bs->records.size() : 0); + } - // if no valid records fall within the query range, just - // set all of the sample sizes to 0 and bail out. - if (total_weight == 0) { - for (size_t i=0; i *) shard_states[i]; - state->sample_size = 0; + size_t total_weight = weights[0]; + for (auto &s : shard_states) { + auto state = (State *) s; + total_weight += state->total_weight; + weights.push_back(state->total_weight); } - return; - } + // if no valid records fall within the query range, just + // set all of the sample sizes to 0 and bail out. + if (total_weight == 0) { + for (size_t i=0; i *) shard_states[i]; + state->sample_size = 0; + } - std::vector normalized_weights; - for (auto w : weights) { - normalized_weights.push_back((double) w / (double) total_weight); + return; + } + + std::vector normalized_weights; + for (auto w : weights) { + normalized_weights.push_back((double) w / (double) total_weight); + } + + bs->alias = new psudb::Alias(normalized_weights); } - auto shard_alias = psudb::Alias(normalized_weights); for (size_t i=0; isample_size; i++) { - auto idx = shard_alias.get(p->rng); + auto idx = bs->alias->get(p->rng); if (idx == 0) { buffer_sz++; } else { @@ -198,16 +216,12 @@ public: return result; } - static std::vector merge(std::vector>> &results, void *parms) { - std::vector output; - + static void merge(std::vector>> &results, void *parms, std::vector &output) { for (size_t i=0; i *) state; delete s; } + + static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { + auto p = (Parms *) parms; + + if (results.size() < p->sample_size) { + auto q = *p; + q.sample_size -= results.size(); + process_query_states(&q, states, buffer_state); + return true; + } + + return false; + } }; }} diff --git a/include/query/knn.h b/include/query/knn.h index 19dcf5c..c856a74 100644 --- a/include/query/knn.h +++ b/include/query/knn.h @@ -114,7 +114,7 @@ public: return results; } - static std::vector merge(std::vector>> &results, void *parms) { + static std::vector merge(std::vector>> &results, void *parms, std::vector &output) { Parms *p = (Parms *) parms; R rec = p->point; size_t k = p->k; @@ -136,7 +136,6 @@ public: } } - std::vector output; while (pq.size() > 0) { output.emplace_back(*pq.peek().data); pq.pop(); @@ -154,6 +153,10 @@ public: auto s = (BufferState *) state; delete s; } + + static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { + return false; + } }; }} diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h index 35d38e3..94c2bce 100644 --- a/include/query/pointlookup.h +++ b/include/query/pointlookup.h @@ -89,8 +89,7 @@ public: return records; } - static std::vector merge(std::vector>> &results, void *parms) { - std::vector output; + static std::vector merge(std::vector>> &results, void *parms, std::vector &output) { for (auto r : results) { if (r.size() > 0) { if (r[0].is_deleted() || r[0].is_tombstone()) { @@ -114,6 +113,11 @@ public: auto s = (BufferState *) state; delete s; } + + + static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { + return false; + } }; }} diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 6c57809..c20feaa 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -134,12 +134,10 @@ public: return records; } - static std::vector merge(std::vector>> &results, void *parms) { - + static std::vector merge(std::vector>> &results, void *parms, std::vector &output) { R res; res.key = 0; res.value = 0; - std::vector output; output.emplace_back(res); for (size_t i=0; i *) state; delete s; } + + static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { + return false; + } }; }} diff --git a/include/query/rangequery.h b/include/query/rangequery.h index e6ab581..e0690e6 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -109,7 +109,7 @@ public: return records; } - static std::vector merge(std::vector>> &results, void *parms) { + static std::vector merge(std::vector>> &results, void *parms, std::vector &output) { std::vector>> cursors; cursors.reserve(results.size()); @@ -133,7 +133,6 @@ public: return std::vector(); } - std::vector output; output.reserve(total); while (pq.size()) { @@ -169,6 +168,10 @@ public: auto s = (BufferState *) state; delete s; } + + static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { + return false; + } }; }} diff --git a/include/query/wirs.h b/include/query/wirs.h index ae82194..62b43f6 100644 --- a/include/query/wirs.h +++ b/include/query/wirs.h @@ -219,9 +219,7 @@ public: return result; } - static std::vector merge(std::vector>> &results, void *parms) { - std::vector output; - + static std::vector merge(std::vector>> &results, void *parms, std::vector &output) { for (size_t i=0; i *) state; delete s; } + + static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { + auto p = (Parms *) parms; + + if (results.size() < p->sample_size) { + return true; + } + return false; + } }; }} diff --git a/include/query/wss.h b/include/query/wss.h index 8797035..fb0b414 100644 --- a/include/query/wss.h +++ b/include/query/wss.h @@ -183,9 +183,7 @@ public: return result; } - static std::vector merge(std::vector>> &results, void *parms) { - std::vector output; - + static std::vector merge(std::vector>> &results, void *parms, std::vector &output) { for (size_t i=0; i *) state; delete s; } + + static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { + auto p = (Parms *) parms; + + if (results.size() < p->sample_size) { + return true; + } + return false; + } }; }} -- cgit v1.2.3 From 735d397513bc0160ba9ecb17c32c4441ed125f52 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 22 Apr 2024 10:24:41 -0400 Subject: TS+PGM: Inlined manually the sorted array merge for performance reasons --- include/shard/PGM.h | 119 +++++++++++++++++++++++++++++---- include/shard/TrieSpline.h | 163 +++++++++++++++++++++++++++++++++++++++------ 2 files changed, 248 insertions(+), 34 deletions(-) (limited to 'include') diff --git a/include/shard/PGM.h b/include/shard/PGM.h index 691385e..509796b 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -49,16 +49,62 @@ public: sizeof(Wrapped), (byte**) &m_data); - auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + std::vector keys; + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + buffer.get_record_count(), + sizeof(Wrapped)); + buffer.copy_to_buffer((byte *) temp_buffer); - if (m_reccnt > 0) { - std::vector keys; - for (size_t i=0; i>()); + + merge_info info = {0, 0}; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) + && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; } + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + keys.emplace_back(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf){ + m_bf->insert(base->rec); + } + } + + base++; + } + + free(temp_buffer); + + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_pgm = pgm::PGMIndex(keys); } } @@ -77,17 +123,62 @@ public: m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **) &m_data); + std::vector keys; - auto res = sorted_array_merge(cursors, m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i=0; i 0) { - std::vector keys; - for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + keys.emplace_back(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); + } + } + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); } + } + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_pgm = pgm::PGMIndex(keys); } } diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 023390e..3ae72f8 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -48,26 +48,82 @@ public: sizeof(Wrapped), (byte**) &m_data); - auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + buffer.get_record_count(), + sizeof(Wrapped)); + buffer.copy_to_buffer((byte *) temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less>()); + + auto tmp_min_key = temp_buffer[0].rec.key; + auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key; + auto bldr = ts::Builder(tmp_min_key, tmp_max_key, E); + + merge_info info = {0, 0}; + + m_min_key = tmp_max_key; + m_max_key = tmp_min_key; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) + && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; + } - if (m_reccnt > 0) { - m_min_key = m_data[0].rec.key; - m_max_key = m_data[m_reccnt-1].rec.key; + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + bldr.AddKey(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf){ + m_bf->insert(base->rec); + } + } + + if (base->rec.key < m_min_key) { + m_min_key = base->rec.key; + } - auto bldr = ts::Builder(m_min_key, m_max_key, E); - for (size_t i=0; irec.key > m_max_key) { + m_max_key = base->rec.key; } + base++; + } + + free(temp_buffer); + + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_ts = bldr.Finalize(); } } TrieSpline(std::vector &shards) - : m_data(nullptr) - , m_reccnt(0) + : m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) , m_max_key(0) @@ -82,19 +138,86 @@ public: attemp_reccnt * sizeof(Wrapped), (byte **) &m_data); - auto res = sorted_array_merge(cursors, m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i=0; i 0) { - m_min_key = m_data[0].rec.key; - m_max_key = m_data[m_reccnt-1].rec.key; + auto tmp_max_key = shards[0]->m_max_key; + auto tmp_min_key = shards[0]->m_min_key; + + for (size_t i=0; im_max_key > tmp_max_key) { + tmp_max_key = shards[i]->m_max_key; + } - auto bldr = ts::Builder(m_min_key, m_max_key, E); - for (size_t i=0; im_min_key < tmp_min_key) { + tmp_min_key = shards[i]->m_min_key; } + } + auto bldr = ts::Builder(tmp_min_key, tmp_max_key, E); + + m_max_key = tmp_min_key; + m_min_key = tmp_max_key; + + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + bldr.AddKey(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); + } + } + + if (cursor.ptr->rec.key < m_min_key) { + m_min_key = cursor.ptr->rec.key; + } + + if (cursor.ptr->rec.key > m_max_key) { + m_max_key = cursor.ptr->rec.key; + } + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_ts = bldr.Finalize(); } } -- cgit v1.2.3 From 5576c5524b48e43e4d6070c28de7c3c66582ed97 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 1 May 2024 16:05:07 -0400 Subject: Query optimizations --- include/query/knn.h | 4 ++-- include/query/rangecount.h | 42 ++++++++++++++++++++---------------------- 2 files changed, 22 insertions(+), 24 deletions(-) (limited to 'include') diff --git a/include/query/knn.h b/include/query/knn.h index c856a74..a227293 100644 --- a/include/query/knn.h +++ b/include/query/knn.h @@ -111,7 +111,7 @@ public: pq.pop(); } - return results; + return std::move(results); } static std::vector merge(std::vector>> &results, void *parms, std::vector &output) { @@ -141,7 +141,7 @@ public: pq.pop(); } - return output; + return std::move(output); } static void delete_query_state(void *state) { diff --git a/include/query/rangecount.h b/include/query/rangecount.h index c20feaa..5a18ed4 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -42,13 +42,7 @@ public: constexpr static bool SKIP_DELETE_FILTER=true; static void *get_query_state(S *shard, void *parms) { - auto res = new State(); - auto p = (Parms *) parms; - - res->start_idx = shard->get_lower_bound(p->lower_bound); - res->stop_idx = shard->get_record_count(); - - return res; + return nullptr; } static void* get_buffer_query_state(BufferView *buffer, void *parms) { @@ -74,37 +68,43 @@ public: res.rec.value = 0; // tombstones records.emplace_back(res); + + auto start_idx = shard->get_lower_bound(p->lower_bound); + auto stop_idx = shard->get_lower_bound(p->upper_bound); + /* * if the returned index is one past the end of the * records for the PGM, then there are not records * in the index falling into the specified range. */ - if (s->start_idx == shard->get_record_count()) { + if (start_idx == shard->get_record_count()) { return records; } - auto ptr = shard->get_record_at(s->start_idx); /* * roll the pointer forward to the first record that is * greater than or equal to the lower bound. */ - while(ptr < shard->get_data() + s->stop_idx && ptr->rec.key < p->lower_bound) { - ptr++; + auto recs = shard->get_data(); + while(start_idx < stop_idx && recs[start_idx].rec.key < p->lower_bound) { + start_idx++; } - while (ptr < shard->get_data() + s->stop_idx && ptr->rec.key <= p->upper_bound) { - if (!ptr->is_deleted()) { - if (ptr->is_tombstone()) { - records[0].rec.value++; - } else { - records[0].rec.key++; - } - } + while (stop_idx < shard->get_record_count() && recs[stop_idx].rec.key <= p->upper_bound) { + stop_idx++; + } + size_t idx = start_idx; + size_t ts_cnt = 0; - ptr++; + while (idx < stop_idx) { + ts_cnt += recs[idx].is_tombstone() * 2 + recs[idx].is_deleted(); + idx++; } + records[0].rec.key = idx - start_idx; + records[0].rec.value = ts_cnt; + return records; } @@ -150,8 +150,6 @@ public: } static void delete_query_state(void *state) { - auto s = (State *) state; - delete s; } static void delete_buffer_query_state(void *state) { -- cgit v1.2.3 From 96faedaeb92776fd9cc2ed8d8b0878ebc9300cbe Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 1 May 2024 18:51:41 -0400 Subject: Added a Bentley-Saxe layout policy --- include/framework/DynamicExtension.h | 21 +++++++-- include/framework/structure/ExtensionStructure.h | 60 ++++++++++++++++++++++-- include/framework/structure/InternalLevel.h | 19 ++++++++ include/framework/util/Configuration.h | 3 +- include/util/types.h | 12 +++-- 5 files changed, 104 insertions(+), 11 deletions(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 6fd95c6..538ff25 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -54,6 +54,10 @@ public: , m_next_core(0) , m_epoch_cnt(0) { + if constexpr (L == LayoutPolicy::BSM) { + assert(scale_factor == 2); + } + auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); m_previous_epoch.store({nullptr, 0}); @@ -487,10 +491,17 @@ private: ((DynamicExtension *) args->extension)->SetThreadAffinity(); Structure *vers = args->epoch->get_structure(); - for (ssize_t i=0; imerges.size(); i++) { - vers->reconstruction(args->merges[i].target, args->merges[i].source); + if constexpr (L == LayoutPolicy::BSM) { + if (args->merges.size() > 0) { + vers->reconstruction(args->merges[0]); + } + } else { + for (ssize_t i=0; imerges.size(); i++) { + vers->reconstruction(args->merges[i].target, args->merges[i].sources[0]); + } } + /* * we'll grab the buffer AFTER doing the internal reconstruction, so we * can flush as many records as possible in one go. The reconstruction @@ -628,7 +639,7 @@ private: static std::vector> filter_deletes(std::vector> &records, ShardID shid, Structure *vers, BufView *bview) { if constexpr (Q::SKIP_DELETE_FILTER) { - return records; + return std::move(records); } std::vector> processed_records; @@ -691,6 +702,10 @@ private: #ifdef _GNU_SOURCE void SetThreadAffinity() { + if constexpr (std::same_as) { + return; + } + int core = m_next_core.fetch_add(1) % m_core_cnt; cpu_set_t mask; CPU_ZERO(&mask); diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index ffba1c1..b83674b 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -343,6 +343,30 @@ public: base_level = grow(scratch_state); } + if constexpr (L == LayoutPolicy::BSM) { + if (base_level == 0) { + return std::move(reconstructions); + } + + ReconstructionTask task; + task.target = base_level; + + size_t base_reccnt = 0; + for (level_index i=base_level; i>source_level; i--) { + auto recon_reccnt = scratch_state[i-1].reccnt; + base_reccnt += recon_reccnt; + scratch_state[i-1].reccnt = 0; + scratch_state[i-1].shardcnt = 0; + task.add_source(i-1, recon_reccnt); + } + + reconstructions.add_reconstruction(task); + scratch_state[base_level].reccnt = base_reccnt; + scratch_state[base_level].shardcnt = 1; + + return std::move(reconstructions); + } + /* * Determine the full set of reconstructions necessary to open up * space in the source level. @@ -384,6 +408,33 @@ public: return std::move(reconstructions); } + inline void reconstruction(ReconstructionTask task) { + static_assert(L == LayoutPolicy::BSM); + std::vector*> levels(task.sources.size()); + for (size_t i=0; i::reconstruction(levels, task.target); + if (task.target >= m_levels.size()) { + m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target), + 1, 1}); + m_levels.emplace_back(new_level); + } else { + m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target), + 1, 1}; + m_levels[task.target] = new_level; + } + + /* remove all of the levels that have been flattened */ + for (size_t i=0; i>(new InternalLevel(task.sources[i], 1)); + m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1}; + } + + return; + } + /* * Combine incoming_level with base_level and reconstruct the shard, * placing it in base_level. The two levels should be sequential--i.e. no @@ -395,7 +446,6 @@ public: if (base_level >= m_levels.size()) { m_levels.emplace_back(std::shared_ptr>(new InternalLevel(base_level, shard_capacity))); - m_current_state.push_back({0, calc_level_record_capacity(base_level), 0, shard_capacity}); } @@ -418,7 +468,7 @@ public: m_levels[incoming_level] = std::shared_ptr>(new InternalLevel(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); /* - * Update the state vector to match the *real* stage following + * Update the state vector to match the *real* state following * the reconstruction */ m_current_state[base_level] = {m_levels[base_level]->get_record_count(), @@ -576,9 +626,11 @@ private: return false; } - if (L == LayoutPolicy::LEVELING) { + if constexpr (L == LayoutPolicy::LEVELING) { return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; - } else { + } else if constexpr (L == LayoutPolicy::BSM) { + return state[idx].reccnt == 0; + } else { return state[idx].shardcnt < state[idx].shardcap; } diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index db38946..b962dcc 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -64,6 +64,21 @@ public: return std::shared_ptr(res); } + static std::shared_ptr reconstruction(std::vector levels, size_t level_idx) { + std::vector shards; + for (auto level : levels) { + for (auto shard : level->m_shards) { + if (shard) shards.emplace_back(shard.get()); + } + } + + auto res = new InternalLevel(level_idx, 1); + res->m_shard_cnt = 1; + res->m_shards[0] = std::make_shared(shards); + + return std::shared_ptr(res); + } + /* * Create a new shard combining the records from all of * the shards in level, and append this new shard into @@ -185,6 +200,10 @@ public: } Shard* get_shard(size_t idx) { + if (idx >= m_shard_cnt) { + return nullptr; + } + return m_shards[idx].get(); } diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 55cc682..4a4524a 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -35,7 +35,8 @@ static thread_local size_t sampling_bailouts = 0; enum class LayoutPolicy { LEVELING, - TEIRING + TEIRING, + BSM }; enum class DeletePolicy { diff --git a/include/util/types.h b/include/util/types.h index bac0246..cf61412 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -73,10 +73,16 @@ const ShardID INVALID_SHID = {-1, -1}; typedef ssize_t level_index; -typedef struct { - level_index source; +typedef struct ReconstructionTask { + std::vector sources; level_index target; size_t reccnt; + + void add_source(level_index source, size_t cnt) { + sources.push_back(source); + reccnt += cnt; + } + } ReconstructionTask; class ReconstructionVector { @@ -91,7 +97,7 @@ public: } void add_reconstruction(level_index source, level_index target, size_t reccnt) { - m_tasks.push_back({source, target, reccnt}); + m_tasks.push_back({{source}, target, reccnt}); total_reccnt += reccnt; } -- cgit v1.2.3 From feeba1de8d9774e32007686627a6a5633dff2559 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 May 2024 09:40:11 -0400 Subject: Missing file from last commit --- include/shard/TrieSpline.h | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) (limited to 'include') diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 3ae72f8..581277e 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -117,7 +117,7 @@ public: m_reccnt = info.record_count; m_tombstone_cnt = info.tombstone_count; - if (m_reccnt > 0) { + if (m_reccnt > 50) { m_ts = bldr.Finalize(); } } @@ -217,7 +217,7 @@ public: m_reccnt = info.record_count; m_tombstone_cnt = info.tombstone_count; - if (m_reccnt > 0) { + if (m_reccnt > 50) { m_ts = bldr.Finalize(); } } @@ -273,6 +273,18 @@ public: } size_t get_lower_bound(const K& key) const { + if (m_reccnt < 50) { + size_t bd = m_reccnt; + for (size_t i=0; i= key) { + bd = i; + break; + } + } + + return bd; + } + auto bound = m_ts.GetSearchBound(key); size_t idx = bound.begin; -- cgit v1.2.3 From a23bc3341923509be9b2f587ece8cd5a650f6386 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 8 May 2024 13:20:44 -0400 Subject: TSParmsweep: enabled forcing a full buffer scan --- include/framework/structure/BufferView.h | 6 +++++- include/query/rangecount.h | 10 +++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) (limited to 'include') diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 44a2044..11b8337 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -112,6 +112,10 @@ public: size_t get_record_count() { return m_tail - m_head; } + + size_t get_capacity() { + return m_cap; + } /* * NOTE: This function returns an upper bound on the number @@ -123,7 +127,7 @@ public: } Wrapped *get(size_t i) { - assert(i < get_record_count()); + //assert(i < get_record_count()); return m_data + to_idx(i); } diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 5a18ed4..5b95cdd 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -35,7 +35,7 @@ struct BufferState { : buffer(buffer) {} }; -template S> +template S, bool FORCE_SCAN=false> class Query { public: constexpr static bool EARLY_ABORT=false; @@ -119,8 +119,16 @@ public: res.rec.value = 0; // tombstones records.emplace_back(res); + size_t stop_idx; + if constexpr (FORCE_SCAN) { + stop_idx = s->buffer->get_capacity() / 2; + } else { + stop_idx = s->buffer->get_record_count(); + } + for (size_t i=0; ibuffer->get_record_count(); i++) { auto rec = s->buffer->get(i); + if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound && !rec->is_deleted()) { if (rec->is_tombstone()) { -- cgit v1.2.3 From 64444e7ea14368d4e6ed40e89b62ff335b87d65b Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 9 May 2024 15:51:43 -0400 Subject: Fixed arithmetic bug --- include/framework/structure/BufferView.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include') diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 11b8337..e95a799 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -164,7 +164,7 @@ private: bool m_active; size_t to_idx(size_t i) { - size_t idx = (m_start + i >= m_cap) ? i = (m_cap - m_start) + size_t idx = (m_start + i >= m_cap) ? i - (m_cap - m_start) : m_start + i; assert(idx < m_cap); return idx; -- cgit v1.2.3 From ab0ab297959fcca370e80670e17f90a780607a80 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 10 May 2024 18:35:30 -0400 Subject: MTree structure size --- include/framework/DynamicExtension.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 538ff25..e2e2784 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -218,7 +218,7 @@ public: */ size_t get_aux_memory_usage() { auto epoch = get_active_epoch(); - auto t = m_buffer->get_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); + auto t = epoch->get_structure()->get_aux_memory_usage(); end_job(epoch); return t; -- cgit v1.2.3