From ba65c8976f54d4da2467074235a12f5be0bd5ebc Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Sun, 22 Dec 2024 15:12:13 -0500 Subject: Continued development --- include/framework/DynamicExtension.h | 92 +++++++++++----------- include/framework/interface/Shard.h | 2 +- include/framework/reconstruction/BSMPolicy.h | 22 +++--- include/framework/reconstruction/LevelingPolicy.h | 22 +++--- .../reconstruction/ReconstructionPolicy.h | 6 +- include/framework/reconstruction/TieringPolicy.h | 18 ++--- include/framework/scheduling/Epoch.h | 8 +- include/framework/structure/ExtensionStructure.h | 63 +++++++++++---- include/framework/structure/InternalLevel.h | 28 +++---- include/framework/structure/MutableBuffer.h | 22 +++--- include/shard/Alias.h | 8 +- include/shard/FSTrie.h | 6 +- include/shard/ISAMTree.h | 4 +- include/shard/LoudsPatricia.h | 6 +- include/shard/PGM.h | 8 +- include/shard/TrieSpline.h | 8 +- include/shard/VPTree.h | 6 +- include/util/SortedMerge.h | 2 +- 18 files changed, 186 insertions(+), 145 deletions(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 1886234..c35bb93 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -15,7 +15,6 @@ #include "framework/interface/Scheduler.h" #include "framework/reconstruction/ReconstructionPolicy.h" -#include "framework/scheduling/FIFOScheduler.h" #include "framework/scheduling/SerialScheduler.h" #include "framework/structure/ExtensionStructure.h" @@ -28,7 +27,7 @@ namespace de { template QueryType, DeletePolicy D = DeletePolicy::TAGGING, - SchedulerInterface SchedType = de::FIFOScheduler> + SchedulerInterface SchedType = de::SerialScheduler> class DynamicExtension { private: /* convenience typedefs for commonly used types within the class */ @@ -76,10 +75,12 @@ public: * performing compactions and flushes, etc. */ DynamicExtension(ReconPolicyType *recon_policy, size_t buffer_low_watermark, - size_t buffer_high_watermark, size_t memory_budget = 0, + size_t buffer_high_watermark = 0, size_t memory_budget = 0, size_t thread_cnt = 16) : m_max_delete_prop(1), m_sched(memory_budget, thread_cnt), - m_buffer(new Buffer(buffer_low_watermark, buffer_high_watermark)), + m_buffer(new Buffer(buffer_low_watermark, (buffer_high_watermark == 0) + ? buffer_low_watermark + : buffer_high_watermark)), m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0), m_recon_policy(recon_policy) { @@ -165,7 +166,7 @@ public: auto view = m_buffer->get_buffer_view(); auto epoch = get_active_epoch(); - if (epoch->get_structure()->tagged_delete(rec)) { + if (epoch->get_mutable_structure()->tagged_delete(rec)) { end_job(epoch); return 1; } @@ -303,13 +304,14 @@ public: auto epoch = get_active_epoch(); auto vers = epoch->get_structure(); - std::vector shards; - - if (vers->get_levels().size() > 0) { - for (int i = vers->get_levels().size() - 1; i >= 0; i--) { - if (vers->get_levels()[i] && - vers->get_levels()[i]->get_record_count() > 0) { - shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); + std::vector shards; + + if (vers->get_level_vector().size() > 0) { + for (int i = vers->get_level_vector().size() - 1; i >= 0; i--) { + if (vers->get_level_vector()[i] && + vers->get_level_vector()[i]->get_record_count() > 0) { + shards.emplace_back( + vers->get_level_vector()[i]->get_combined_shard()); } } } @@ -358,7 +360,8 @@ public: */ bool validate_tombstone_proportion() { auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->validate_tombstone_proportion(); + auto t = epoch->get_structure()->validate_tombstone_proportion( + m_max_delete_prop); end_job(epoch); return t; } @@ -370,7 +373,6 @@ public: void print_scheduler_statistics() const { m_sched.print_statistics(); } private: - ReconPolicyType const *m_recon_policy; double m_max_delete_prop; SchedType m_sched; @@ -380,6 +382,8 @@ private: std::atomic m_next_core; std::atomic m_epoch_cnt; + ReconPolicyType const *m_recon_policy; + alignas(64) std::atomic m_reconstruction_scheduled; std::atomic m_next_epoch; @@ -547,39 +551,36 @@ private: auto args = (ReconstructionArgs *)arguments; ((DynamicExtension *)args->extension)->SetThreadAffinity(); - Structure *vers = args->epoch->get_structure(); + Structure *vers = args->epoch->get_mutable_structure(); - for (size_t i=0; itasks.size(); i++) { - vers->perform_reconstruction(args->tasks[i]); - } + ReconstructionTask flush_task; + flush_task.type = ReconstructionType::Invalid; - /* - * we'll grab the buffer AFTER doing the internal reconstruction, so we - * can flush as many records as possible in one go. The reconstruction - * was done so as to make room for the full buffer anyway, so there's - * no real benefit to doing this first. - */ - auto buffer_view = args->epoch->get_buffer(); - size_t new_head = buffer_view.get_tail(); + for (size_t i = 0; i < args->tasks.size(); i++) { + if (args->tasks[i].sources.size() > 0 && + args->tasks[i].sources[0] == buffer_shid) { + flush_task = args->tasks[i]; + continue; + } - /* - * if performing a compaction, don't flush the buffer, as - * there is no guarantee that any necessary reconstructions - * will free sufficient space in L0 to support a flush - */ - if (!args->compaction) { - vers->flush_buffer(std::move(buffer_view)); + vers->perform_reconstruction(args->tasks[i]); } - args->result.set_value(true); + if (flush_task.type != ReconstructionType::Invalid) { + /* + * we'll grab the buffer AFTER doing the internal reconstruction, so we + * can flush as many records as possible in one go. The reconstruction + * was done so as to make room for the full buffer anyway, so there's + * no real benefit to doing this first. + */ + auto buffer_view = args->epoch->get_buffer(); + size_t new_head = buffer_view.get_tail(); - /* - * Compactions occur on an epoch _before_ it becomes active, - * and as a result the active epoch should _not_ be advanced as - * part of a compaction - */ - if (!args->compaction) { + vers->perform_flush(flush_task, std::move(buffer_view)); + args->result.set_value(true); ((DynamicExtension *)args->extension)->advance_epoch(new_head); + } else { + args->result.set_value(true); } ((DynamicExtension *)args->extension) @@ -660,10 +661,12 @@ private: args->tasks = m_recon_policy->get_reconstruction_tasks( epoch, m_buffer->get_high_watermark()); args->extension = this; - args->compaction = false; - /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed - * here */ + args->tasks.add_reconstruction(m_recon_policy->get_flush_task(epoch)); + /* + * NOTE: args is deleted by the reconstruction job, so shouldn't be freed + * here + */ m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); } @@ -691,7 +694,8 @@ private: return m_buffer->append(rec, ts); } -#ifdef _GNU_SOURCE +//#ifdef _GNU_SOURCE +#if 0 void SetThreadAffinity() { if constexpr (std::same_as) { return; diff --git a/include/framework/interface/Shard.h b/include/framework/interface/Shard.h index bd980c0..fb5ce1a 100644 --- a/include/framework/interface/Shard.h +++ b/include/framework/interface/Shard.h @@ -14,7 +14,7 @@ namespace de { template concept ShardInterface = RecordInterface && - requires(SHARD shard, const std::vector &shard_vector, bool b, + requires(SHARD shard, const std::vector &shard_vector, bool b, BufferView bv, typename SHARD::RECORD rec) { /* construct a shard from a vector of shards of the same type */ diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h index 9138bd1..4ceca9a 100644 --- a/include/framework/reconstruction/BSMPolicy.h +++ b/include/framework/reconstruction/BSMPolicy.h @@ -16,17 +16,17 @@ namespace de { template QueryType> -class BSMPolicy : ReconstructionPolicy { +class BSMPolicy : public ReconstructionPolicy { typedef std::vector>> LevelVector; public: - BSMPolicy(size_t scale_factor, size_t buffer_size) - : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} + BSMPolicy(size_t buffer_size) + : m_scale_factor(2), m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(Epoch *epoch, - size_t incoming_reccnt) override { + get_reconstruction_tasks(const Epoch *epoch, + size_t incoming_reccnt) const override { ReconstructionVector reconstructions; auto levels = epoch->get_structure()->get_level_vector(); @@ -44,7 +44,7 @@ public: task.type = ReconstructionType::Merge; for (level_index i = target_level; i > source_level; i--) { - if (i < levels.size()) { + if (i < (level_index)levels.size()) { task.add_shard({i, all_shards_idx}, levels[i]->get_record_count()); } } @@ -54,17 +54,17 @@ public: } ReconstructionTask - get_flush_task(Epoch *epoch) override { + get_flush_task(const Epoch *epoch) const override { return ReconstructionTask{ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush}; } private: - level_index find_reconstruction_target(LevelVector &levels) { + level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = 0; - for (size_t i = 0; i < (level_index)levels.size(); i++) { - if (levels[i].get_record_count() + 1 <= capacity(i)) { + for (level_index i = 0; i < (level_index)levels.size(); i++) { + if (levels[i]->get_record_count() + 1 <= capacity(i)) { target_level = i; break; } @@ -73,7 +73,7 @@ private: return target_level; } - inline size_t capacity(level_index level) { + inline size_t capacity(level_index level) const { return m_buffer_size * pow(m_scale_factor, level + 1); } diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h index 00f2cff..8bf5645 100644 --- a/include/framework/reconstruction/LevelingPolicy.h +++ b/include/framework/reconstruction/LevelingPolicy.h @@ -16,7 +16,7 @@ namespace de { template QueryType> -class LevelingPolicy : ReconstructionPolicy { +class LevelingPolicy : public ReconstructionPolicy { typedef std::vector>> LevelVector; @@ -25,8 +25,8 @@ public: : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(Epoch *epoch, - size_t incoming_reccnt) override { + get_reconstruction_tasks(const Epoch *epoch, + size_t incoming_reccnt) const override { ReconstructionVector reconstructions; auto levels = epoch->get_structure()->get_level_vector(); @@ -41,7 +41,7 @@ public: for (level_index i = target_level; i > source_level; i--) { size_t target_reccnt = - (i < levels.size()) ? levels[i]->get_record_count() : 0; + (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0; size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt; reconstructions.add_reconstruction(i - 1, i, total_reccnt, @@ -52,29 +52,29 @@ public: } ReconstructionTask - get_flush_task(Epoch *epoch) override { + get_flush_task(const Epoch *epoch) const override { return ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush}; + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge}; } private: - level_index find_reconstruction_target(LevelVector &levels) { + level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = 0; size_t incoming_records = m_buffer_size; - for (size_t i = 0; i < (level_index)levels.size(); i++) { - if (levels[i].get_record_count() + incoming_records < capacity(i)) { + for (level_index i = 0; i < (level_index)levels.size(); i++) { + if (levels[i]->get_record_count() + incoming_records < capacity(i)) { target_level = i; break; } - incoming_records = levels[i].get_record_count(); + incoming_records = levels[i]->get_record_count(); } return target_level; } - inline size_t capacity(level_index level) { + inline size_t capacity(level_index level) const { return m_buffer_size * pow(m_scale_factor, level + 1); } diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h index 976091e..aa213df 100644 --- a/include/framework/reconstruction/ReconstructionPolicy.h +++ b/include/framework/reconstruction/ReconstructionPolicy.h @@ -23,8 +23,8 @@ class ReconstructionPolicy { public: ReconstructionPolicy() {} - virtual ReconstructionVector get_reconstruction_tasks(Epoch *epoch, - size_t incoming_reccnt) = 0; - virtual ReconstructionTask get_flush_task(Epoch *epoch) = 0; + virtual ReconstructionVector get_reconstruction_tasks(const Epoch *epoch, + size_t incoming_reccnt) const = 0; + virtual ReconstructionTask get_flush_task(const Epoch *epoch) const = 0; }; } diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h index 120bcb5..71fe9ec 100644 --- a/include/framework/reconstruction/TieringPolicy.h +++ b/include/framework/reconstruction/TieringPolicy.h @@ -16,15 +16,15 @@ namespace de { template QueryType> -class TieringPolicy : ReconstructionPolicy { +class TieringPolicy : public ReconstructionPolicy { typedef std::vector>> LevelVector; public: TieringPolicy(size_t scale_factor, size_t buffer_size) : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(Epoch *epoch, - size_t incoming_reccnt) override { + get_reconstruction_tasks(const Epoch *epoch, + size_t incoming_reccnt) const override { ReconstructionVector reconstructions; auto levels = epoch->get_structure()->get_level_vector(); @@ -39,7 +39,7 @@ public: for (level_index i = target_level; i > source_level; i--) { size_t target_reccnt = - (i < levels.size()) ? levels[i]->get_record_count() : 0; + (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0; size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt; reconstructions.add_reconstruction(i - 1, i, total_reccnt, @@ -50,17 +50,17 @@ public: } ReconstructionTask - get_flush_task(Epoch *epoch) override { + get_flush_task(const Epoch *epoch) const override { return ReconstructionTask{ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush}; } private: - level_index find_reconstruction_target(LevelVector &levels) { + level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = 0; - for (size_t i = 0; i < (level_index) levels.size(); i++) { - if (levels[i].get_shard_count() + 1 <= capacity()) { + for (level_index i = 0; i < (level_index) levels.size(); i++) { + if (levels[i]->get_shard_count() + 1 <= capacity()) { target_level = i; break; } @@ -69,7 +69,7 @@ private: return target_level; } - inline size_t capacity() { + inline size_t capacity() const { return m_scale_factor; } diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 303ab2f..95c64ea 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -54,11 +54,13 @@ public: Epoch &operator=(const Epoch &) = delete; Epoch &operator=(Epoch &&) = delete; - size_t get_epoch_number() { return m_epoch_number; } + size_t get_epoch_number() const { return m_epoch_number; } - Structure *get_structure() { return m_structure; } + const Structure *get_structure() const { return m_structure; } - BufView get_buffer() { return m_buffer->get_buffer_view(m_buffer_head); } + Structure *get_mutable_structure() { return m_structure; } + + BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); } /* * Returns a new Epoch object that is a copy of this one. The new object diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 9b7ae87..3bb8a0b 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -147,14 +147,14 @@ public: inline void perform_reconstruction(ReconstructionTask task) { /* perform the reconstruction itself */ - std::vector shards; + std::vector shards; for (ShardID shid : task.sources) { - assert(shid.level_idx < m_levels.size()); + assert(shid.level_idx < (level_index) m_levels.size()); assert(shid.shard_idx >= -1); /* if unspecified, push all shards into the vector */ if (shid.shard_idx == all_shards_idx) { - for (size_t i = 0; i < m_levels[shid.level_idx].get_shard_count(); + for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) { if (m_levels[shid.level_idx]->get_shard(i)) { shards.push_back(m_levels[shid.level_idx]->get_shard(i)); @@ -165,7 +165,7 @@ public: } } - auto new_shard = Shard(shards); + auto new_shard = new ShardType(shards); /* * Remove all of the shards processed by the operation @@ -181,10 +181,11 @@ public: /* * Append the new shard to the target level */ - if (task.target < m_levels.size()) { - m_levels[task.target]->append_shard(new_shard); - } else { - m_levels.push_back(); + if (task.target < (level_index)m_levels.size()) { + m_levels[task.target]->append(std::shared_ptr(new_shard)); + } else { /* grow the structure if needed */ + m_levels.push_back(std::make_shared>(task.target)); + m_levels[task.target]->append(std::shared_ptr(new_shard)); } } @@ -197,12 +198,20 @@ public: * the buffer itself. Given that we're unlikely to actually use policies * like that, we'll leave this as low priority. */ - ShardType *buffer_shard = new ShardType(buffer); - if (task.type == ReconstructionType::Append) { - m_levels[0]->append(std::shared_ptr(buffer_shard)); + + /* insert the first level, if needed */ + if (m_levels.size() == 0) { + m_levels.push_back( + std::make_shared>(0)); + } + + ShardType *buffer_shard = new ShardType(std::move(buffer)); + if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) { + m_levels[0]->append(std::shared_ptr(buffer_shard)); } else { - std::vector shards; - for (size_t i = 0; i < m_levels[0].size(); i++) { + std::vector shards; + for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count(); + i++) { if (m_levels[0]->get_shard(i)) { shards.push_back(m_levels[0]->get_shard(i)); } @@ -210,7 +219,7 @@ public: shards.push_back(buffer_shard); ShardType *new_shard = new ShardType(shards); m_levels[0]->truncate(); - m_levels[0]->append(std::shared_ptr(new_shard)); + m_levels[0]->append(std::shared_ptr(new_shard)); } } } @@ -243,6 +252,32 @@ public: LevelVector const &get_level_vector() const { return m_levels; } + + /* + * Validate that no level in the structure exceeds its maximum tombstone + * capacity. This is used to trigger preemptive compactions at the end of + * the reconstruction process. + */ + bool validate_tombstone_proportion(double max_delete_prop) const { + long double ts_prop; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + ts_prop = (long double)m_levels[i]->get_tombstone_count() / + (long double)m_levels[i]->get_record_count(); + if (ts_prop > (long double)max_delete_prop) { + return false; + } + } + } + + return true; + } + + bool validate_tombstone_proportion(level_index level, double max_delete_prop) const { + long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) m_levels[level]->get_record_count(); + return ts_prop <= (long double) max_delete_prop; + } + private: std::atomic m_refcnt; LevelVector m_levels; diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index 8cfcd49..c9d1749 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -40,12 +40,12 @@ public: * * No changes are made to this level. */ - ShardType *get_combined_shard() { + ShardType *get_combined_shard() const { if (m_shards.size() == 0) { return nullptr; } - std::vector shards; + std::vector shards; for (auto shard : m_shards) { if (shard) shards.emplace_back(shard.get()); @@ -57,7 +57,7 @@ public: void get_local_queries( std::vector> &shards, std::vector &local_queries, - typename QueryType::Parameters *query_parms) { + typename QueryType::Parameters *query_parms) const { for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { auto local_query = @@ -68,7 +68,7 @@ public: } } - bool check_tombstone(size_t shard_stop, const RecordType &rec) { + bool check_tombstone(size_t shard_stop, const RecordType &rec) const { if (m_shards.size() == 0) return false; @@ -100,7 +100,7 @@ public: return false; } - ShardType *get_shard(size_t idx) { + const ShardType *get_shard(size_t idx) const { if (idx >= m_shards.size()) { return nullptr; } @@ -108,9 +108,9 @@ public: return m_shards[idx].get(); } - size_t get_shard_count() { return m_shards.size(); } + size_t get_shard_count() const { return m_shards.size(); } - size_t get_record_count() { + size_t get_record_count() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { @@ -121,7 +121,7 @@ public: return cnt; } - size_t get_tombstone_count() { + size_t get_tombstone_count() const { size_t res = 0; for (size_t i = 0; i < m_shards.size(); ++i) { if (m_shards[i]) { @@ -131,7 +131,7 @@ public: return res; } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { @@ -142,7 +142,7 @@ public: return cnt; } - size_t get_memory_usage() { + size_t get_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { @@ -153,7 +153,7 @@ public: return cnt; } - double get_tombstone_prop() { + double get_tombstone_prop() const { size_t tscnt = 0; size_t reccnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { @@ -166,10 +166,10 @@ public: return (double)tscnt / (double)(tscnt + reccnt); } - std::shared_ptr clone() { + std::shared_ptr clone() const { auto new_level = std::make_shared(m_level_no); for (size_t i = 0; i < m_shards.size(); i++) { - new_level->m_shards[i] = m_shards[i]; + new_level->append(m_shards[i]); } return new_level; @@ -181,7 +181,7 @@ public: m_shards.erase(m_shards.begin() + shard); } - bool append(std::shared_ptr shard) { + void append(std::shared_ptr shard) { m_shards.emplace_back(shard); } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 625b04b..0eae73d 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -97,15 +97,15 @@ public: return true; } - size_t get_record_count() { return m_tail.load() - m_head.load().head_idx; } + size_t get_record_count() const { return m_tail.load() - m_head.load().head_idx; } - size_t get_capacity() { return m_cap; } + size_t get_capacity() const { return m_cap; } - bool is_full() { return get_record_count() >= m_hwm; } + bool is_full() const { return get_record_count() >= m_hwm; } - bool is_at_low_watermark() { return get_record_count() >= m_lwm; } + bool is_at_low_watermark() const { return get_record_count() >= m_lwm; } - size_t get_tombstone_count() { return m_tscnt.load(); } + size_t get_tombstone_count() const { return m_tscnt.load(); } bool delete_record(const R &rec) { return get_buffer_view().delete_record(rec); @@ -115,9 +115,9 @@ public: return get_buffer_view().check_tombstone(rec); } - size_t get_memory_usage() { return m_cap * sizeof(Wrapped); } + size_t get_memory_usage() const { return m_cap * sizeof(Wrapped); } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { return m_tombstone_filter->get_memory_usage(); } @@ -200,7 +200,7 @@ public: m_lwm = lwm; } - size_t get_low_watermark() { return m_lwm; } + size_t get_low_watermark() const { return m_lwm; } void set_high_watermark(size_t hwm) { assert(hwm > m_lwm); @@ -208,9 +208,9 @@ public: m_hwm = hwm; } - size_t get_high_watermark() { return m_hwm; } + size_t get_high_watermark() const { return m_hwm; } - size_t get_tail() { return m_tail.load(); } + size_t get_tail() const { return m_tail.load(); } /* * Note: this returns the available physical storage capacity, @@ -220,7 +220,7 @@ public: * but a buggy framework implementation may violate the * assumption. */ - size_t get_available_capacity() { + size_t get_available_capacity() const { if (m_old_head.load().refcnt == 0) { return m_cap - (m_tail.load() - m_head.load().head_idx); } diff --git a/include/shard/Alias.h b/include/shard/Alias.h index 8fe70a5..15b0884 100644 --- a/include/shard/Alias.h +++ b/include/shard/Alias.h @@ -70,7 +70,7 @@ public: } } - Alias(std::vector const &shards) + Alias(std::vector const &shards) : m_data(nullptr) , m_alias(nullptr) , m_total_weight(0) @@ -146,15 +146,15 @@ public: } - size_t get_memory_usage() { + size_t get_memory_usage() const { return 0; } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { return (m_bf) ? m_bf->memory_usage() : 0; } - W get_total_weight() { + W get_total_weight() const { return m_total_weight; } diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index 4e51037..d720aad 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -82,7 +82,7 @@ public: delete[] temp_buffer; } - FSTrie(std::vector const &shards) + FSTrie(std::vector const &shards) : m_data(nullptr) , m_reccnt(0) , m_alloc_size(0) @@ -181,11 +181,11 @@ public: } - size_t get_memory_usage() { + size_t get_memory_usage() const { return m_fst->getMemoryUsage(); } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { return m_alloc_size; } diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index 64c0b2b..f6b525f 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -65,7 +65,7 @@ public: } } - ISAMTree(std::vector const &shards) + ISAMTree(std::vector const &shards) : m_bf(nullptr), m_isam_nodes(nullptr), m_root(nullptr), m_reccnt(0), m_tombstone_cnt(0), m_internal_node_cnt(0), m_deleted_cnt(0), m_alloc_size(0) { @@ -93,7 +93,7 @@ public: delete m_bf; } - Wrapped *point_lookup(const R &rec, bool filter = false) { + Wrapped *point_lookup(const R &rec, bool filter = false) const { if (filter && !m_bf->lookup(rec)) { return nullptr; } diff --git a/include/shard/LoudsPatricia.h b/include/shard/LoudsPatricia.h index 3452839..fe0c30e 100644 --- a/include/shard/LoudsPatricia.h +++ b/include/shard/LoudsPatricia.h @@ -82,7 +82,7 @@ public: delete[] temp_buffer; } - LoudsPatricia(std::vector &shards) + LoudsPatricia(std::vector &shards) : m_data(nullptr) , m_reccnt(0) , m_alloc_size(0) @@ -178,11 +178,11 @@ public: } - size_t get_memory_usage() { + size_t get_memory_usage() const { return m_louds->size(); } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { return m_alloc_size; } diff --git a/include/shard/PGM.h b/include/shard/PGM.h index 7d1f492..5b39ab4 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -111,7 +111,7 @@ public: } } - PGM(std::vector const &shards) + PGM(std::vector const &shards) : m_data(nullptr) , m_bf(nullptr) , m_reccnt(0) @@ -190,7 +190,7 @@ public: delete m_bf; } - Wrapped *point_lookup(const R &rec, bool filter=false) { + Wrapped *point_lookup(const R &rec, bool filter=false) const { size_t idx = get_lower_bound(rec.key); if (idx >= m_reccnt) { return nullptr; @@ -223,11 +223,11 @@ public: } - size_t get_memory_usage() { + size_t get_memory_usage() const { return m_pgm.size_in_bytes(); } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { return (m_bf) ? m_bf->memory_usage() : 0; } diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 9d8c3bb..7f4d4e5 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -124,7 +124,7 @@ public: } } - TrieSpline(std::vector const &shards) + TrieSpline(std::vector const &shards) : m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) @@ -229,7 +229,7 @@ public: delete m_bf; } - Wrapped *point_lookup(const R &rec, bool filter=false) { + Wrapped *point_lookup(const R &rec, bool filter=false) const { if (filter && m_bf && !m_bf->lookup(rec)) { return nullptr; } @@ -266,11 +266,11 @@ public: } - size_t get_memory_usage() { + size_t get_memory_usage() const { return m_ts.GetSize(); } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { return (m_bf) ? m_bf->memory_usage() : 0; } diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h index 477db5c..7130efe 100644 --- a/include/shard/VPTree.h +++ b/include/shard/VPTree.h @@ -86,7 +86,7 @@ public: } } - VPTree(std::vector shards) + VPTree(std::vector shards) : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) { size_t attemp_reccnt = 0; @@ -174,11 +174,11 @@ public: return m_data + idx; } - size_t get_memory_usage() { + size_t get_memory_usage() const { return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*); } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { // FIXME: need to return the size of the unordered_map return 0; } diff --git a/include/util/SortedMerge.h b/include/util/SortedMerge.h index b0a3215..c41a7ae 100644 --- a/include/util/SortedMerge.h +++ b/include/util/SortedMerge.h @@ -51,7 +51,7 @@ struct merge_info { */ template static std::vector>> -build_cursor_vec(std::vector const &shards, size_t *reccnt, +build_cursor_vec(std::vector const &shards, size_t *reccnt, size_t *tscnt) { std::vector>> cursors; cursors.reserve(shards.size()); -- cgit v1.2.3