diff options
| author | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-12-22 15:12:13 -0500 |
|---|---|---|
| committer | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-12-22 15:12:13 -0500 |
| commit | ba65c8976f54d4da2467074235a12f5be0bd5ebc (patch) | |
| tree | 955d5995f211d8a7a24f7b106912773db5e3a5ba | |
| parent | 5617bed5257506d3dfda8537b16f44b3e40f1b42 (diff) | |
| download | dynamic-extension-ba65c8976f54d4da2467074235a12f5be0bd5ebc.tar.gz | |
Continued development
33 files changed, 261 insertions, 377 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 0c1aaa1..e35ae19 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,10 +9,10 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(namespace "de") project("Practical Dynamic Extension" VERSION 0.1.0) -set(debug false) +set(debug true) set(tests True) set(bench false) -set(vldb_bench true) +set(vldb_bench false) # ALEX doesn't build under C++20 set(build_alex false) @@ -51,11 +51,6 @@ if (tests) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/bin/tests") file(MAKE_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/tests/data") - add_executable(internal_level_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/internal_level_tests.cpp) - target_link_libraries(internal_level_tests PUBLIC gsl check subunit pthread atomic) - target_link_options(internal_level_tests PUBLIC -mcx16) - target_include_directories(internal_level_tests PRIVATE include external/psudb-common/cpp/include) - add_executable(mutable_buffer_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/mutable_buffer_tests.cpp) target_link_libraries(mutable_buffer_tests PUBLIC gsl check subunit pthread atomic) target_link_options(mutable_buffer_tests PUBLIC -mcx16) diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 1886234..c35bb93 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -15,7 +15,6 @@ #include "framework/interface/Scheduler.h" #include "framework/reconstruction/ReconstructionPolicy.h" -#include "framework/scheduling/FIFOScheduler.h" #include "framework/scheduling/SerialScheduler.h" #include "framework/structure/ExtensionStructure.h" @@ -28,7 +27,7 @@ namespace de { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType, DeletePolicy D = DeletePolicy::TAGGING, - SchedulerInterface SchedType = de::FIFOScheduler> + SchedulerInterface SchedType = de::SerialScheduler> class DynamicExtension { private: /* convenience typedefs for commonly used types within the class */ @@ -76,10 +75,12 @@ public: * performing compactions and flushes, etc. */ DynamicExtension(ReconPolicyType *recon_policy, size_t buffer_low_watermark, - size_t buffer_high_watermark, size_t memory_budget = 0, + size_t buffer_high_watermark = 0, size_t memory_budget = 0, size_t thread_cnt = 16) : m_max_delete_prop(1), m_sched(memory_budget, thread_cnt), - m_buffer(new Buffer(buffer_low_watermark, buffer_high_watermark)), + m_buffer(new Buffer(buffer_low_watermark, (buffer_high_watermark == 0) + ? buffer_low_watermark + : buffer_high_watermark)), m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0), m_recon_policy(recon_policy) { @@ -165,7 +166,7 @@ public: auto view = m_buffer->get_buffer_view(); auto epoch = get_active_epoch(); - if (epoch->get_structure()->tagged_delete(rec)) { + if (epoch->get_mutable_structure()->tagged_delete(rec)) { end_job(epoch); return 1; } @@ -303,13 +304,14 @@ public: auto epoch = get_active_epoch(); auto vers = epoch->get_structure(); - std::vector<ShardType *> shards; - - if (vers->get_levels().size() > 0) { - for (int i = vers->get_levels().size() - 1; i >= 0; i--) { - if (vers->get_levels()[i] && - vers->get_levels()[i]->get_record_count() > 0) { - shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); + std::vector<const ShardType *> shards; + + if (vers->get_level_vector().size() > 0) { + for (int i = vers->get_level_vector().size() - 1; i >= 0; i--) { + if (vers->get_level_vector()[i] && + vers->get_level_vector()[i]->get_record_count() > 0) { + shards.emplace_back( + vers->get_level_vector()[i]->get_combined_shard()); } } } @@ -358,7 +360,8 @@ public: */ bool validate_tombstone_proportion() { auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->validate_tombstone_proportion(); + auto t = epoch->get_structure()->validate_tombstone_proportion( + m_max_delete_prop); end_job(epoch); return t; } @@ -370,7 +373,6 @@ public: void print_scheduler_statistics() const { m_sched.print_statistics(); } private: - ReconPolicyType const *m_recon_policy; double m_max_delete_prop; SchedType m_sched; @@ -380,6 +382,8 @@ private: std::atomic<int> m_next_core; std::atomic<size_t> m_epoch_cnt; + ReconPolicyType const *m_recon_policy; + alignas(64) std::atomic<bool> m_reconstruction_scheduled; std::atomic<epoch_ptr> m_next_epoch; @@ -547,39 +551,36 @@ private: auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments; ((DynamicExtension *)args->extension)->SetThreadAffinity(); - Structure *vers = args->epoch->get_structure(); + Structure *vers = args->epoch->get_mutable_structure(); - for (size_t i=0; i<args->tasks.size(); i++) { - vers->perform_reconstruction(args->tasks[i]); - } + ReconstructionTask flush_task; + flush_task.type = ReconstructionType::Invalid; - /* - * we'll grab the buffer AFTER doing the internal reconstruction, so we - * can flush as many records as possible in one go. The reconstruction - * was done so as to make room for the full buffer anyway, so there's - * no real benefit to doing this first. - */ - auto buffer_view = args->epoch->get_buffer(); - size_t new_head = buffer_view.get_tail(); + for (size_t i = 0; i < args->tasks.size(); i++) { + if (args->tasks[i].sources.size() > 0 && + args->tasks[i].sources[0] == buffer_shid) { + flush_task = args->tasks[i]; + continue; + } - /* - * if performing a compaction, don't flush the buffer, as - * there is no guarantee that any necessary reconstructions - * will free sufficient space in L0 to support a flush - */ - if (!args->compaction) { - vers->flush_buffer(std::move(buffer_view)); + vers->perform_reconstruction(args->tasks[i]); } - args->result.set_value(true); + if (flush_task.type != ReconstructionType::Invalid) { + /* + * we'll grab the buffer AFTER doing the internal reconstruction, so we + * can flush as many records as possible in one go. The reconstruction + * was done so as to make room for the full buffer anyway, so there's + * no real benefit to doing this first. + */ + auto buffer_view = args->epoch->get_buffer(); + size_t new_head = buffer_view.get_tail(); - /* - * Compactions occur on an epoch _before_ it becomes active, - * and as a result the active epoch should _not_ be advanced as - * part of a compaction - */ - if (!args->compaction) { + vers->perform_flush(flush_task, std::move(buffer_view)); + args->result.set_value(true); ((DynamicExtension *)args->extension)->advance_epoch(new_head); + } else { + args->result.set_value(true); } ((DynamicExtension *)args->extension) @@ -660,10 +661,12 @@ private: args->tasks = m_recon_policy->get_reconstruction_tasks( epoch, m_buffer->get_high_watermark()); args->extension = this; - args->compaction = false; - /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed - * here */ + args->tasks.add_reconstruction(m_recon_policy->get_flush_task(epoch)); + /* + * NOTE: args is deleted by the reconstruction job, so shouldn't be freed + * here + */ m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); } @@ -691,7 +694,8 @@ private: return m_buffer->append(rec, ts); } -#ifdef _GNU_SOURCE +//#ifdef _GNU_SOURCE +#if 0 void SetThreadAffinity() { if constexpr (std::same_as<SchedType, SerialScheduler>) { return; diff --git a/include/framework/interface/Shard.h b/include/framework/interface/Shard.h index bd980c0..fb5ce1a 100644 --- a/include/framework/interface/Shard.h +++ b/include/framework/interface/Shard.h @@ -14,7 +14,7 @@ namespace de { template <typename SHARD> concept ShardInterface = RecordInterface<typename SHARD::RECORD> && - requires(SHARD shard, const std::vector<SHARD *> &shard_vector, bool b, + requires(SHARD shard, const std::vector<const SHARD *> &shard_vector, bool b, BufferView<typename SHARD::RECORD> bv, typename SHARD::RECORD rec) { /* construct a shard from a vector of shards of the same type */ diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h index 9138bd1..4ceca9a 100644 --- a/include/framework/reconstruction/BSMPolicy.h +++ b/include/framework/reconstruction/BSMPolicy.h @@ -16,17 +16,17 @@ namespace de { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> -class BSMPolicy : ReconstructionPolicy<ShardType, QueryType> { +class BSMPolicy : public ReconstructionPolicy<ShardType, QueryType> { typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector; public: - BSMPolicy(size_t scale_factor, size_t buffer_size) - : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} + BSMPolicy(size_t buffer_size) + : m_scale_factor(2), m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch, - size_t incoming_reccnt) override { + get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch, + size_t incoming_reccnt) const override { ReconstructionVector reconstructions; auto levels = epoch->get_structure()->get_level_vector(); @@ -44,7 +44,7 @@ public: task.type = ReconstructionType::Merge; for (level_index i = target_level; i > source_level; i--) { - if (i < levels.size()) { + if (i < (level_index)levels.size()) { task.add_shard({i, all_shards_idx}, levels[i]->get_record_count()); } } @@ -54,17 +54,17 @@ public: } ReconstructionTask - get_flush_task(Epoch<ShardType, QueryType> *epoch) override { + get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override { return ReconstructionTask{ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush}; } private: - level_index find_reconstruction_target(LevelVector &levels) { + level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = 0; - for (size_t i = 0; i < (level_index)levels.size(); i++) { - if (levels[i].get_record_count() + 1 <= capacity(i)) { + for (level_index i = 0; i < (level_index)levels.size(); i++) { + if (levels[i]->get_record_count() + 1 <= capacity(i)) { target_level = i; break; } @@ -73,7 +73,7 @@ private: return target_level; } - inline size_t capacity(level_index level) { + inline size_t capacity(level_index level) const { return m_buffer_size * pow(m_scale_factor, level + 1); } diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h index 00f2cff..8bf5645 100644 --- a/include/framework/reconstruction/LevelingPolicy.h +++ b/include/framework/reconstruction/LevelingPolicy.h @@ -16,7 +16,7 @@ namespace de { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> -class LevelingPolicy : ReconstructionPolicy<ShardType, QueryType> { +class LevelingPolicy : public ReconstructionPolicy<ShardType, QueryType> { typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector; @@ -25,8 +25,8 @@ public: : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch, - size_t incoming_reccnt) override { + get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch, + size_t incoming_reccnt) const override { ReconstructionVector reconstructions; auto levels = epoch->get_structure()->get_level_vector(); @@ -41,7 +41,7 @@ public: for (level_index i = target_level; i > source_level; i--) { size_t target_reccnt = - (i < levels.size()) ? levels[i]->get_record_count() : 0; + (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0; size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt; reconstructions.add_reconstruction(i - 1, i, total_reccnt, @@ -52,29 +52,29 @@ public: } ReconstructionTask - get_flush_task(Epoch<ShardType, QueryType> *epoch) override { + get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override { return ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush}; + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge}; } private: - level_index find_reconstruction_target(LevelVector &levels) { + level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = 0; size_t incoming_records = m_buffer_size; - for (size_t i = 0; i < (level_index)levels.size(); i++) { - if (levels[i].get_record_count() + incoming_records < capacity(i)) { + for (level_index i = 0; i < (level_index)levels.size(); i++) { + if (levels[i]->get_record_count() + incoming_records < capacity(i)) { target_level = i; break; } - incoming_records = levels[i].get_record_count(); + incoming_records = levels[i]->get_record_count(); } return target_level; } - inline size_t capacity(level_index level) { + inline size_t capacity(level_index level) const { return m_buffer_size * pow(m_scale_factor, level + 1); } diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h index 976091e..aa213df 100644 --- a/include/framework/reconstruction/ReconstructionPolicy.h +++ b/include/framework/reconstruction/ReconstructionPolicy.h @@ -23,8 +23,8 @@ class ReconstructionPolicy { public: ReconstructionPolicy() {} - virtual ReconstructionVector get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch, - size_t incoming_reccnt) = 0; - virtual ReconstructionTask get_flush_task(Epoch<ShardType, QueryType> *epoch) = 0; + virtual ReconstructionVector get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch, + size_t incoming_reccnt) const = 0; + virtual ReconstructionTask get_flush_task(const Epoch<ShardType, QueryType> *epoch) const = 0; }; } diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h index 120bcb5..71fe9ec 100644 --- a/include/framework/reconstruction/TieringPolicy.h +++ b/include/framework/reconstruction/TieringPolicy.h @@ -16,15 +16,15 @@ namespace de { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> -class TieringPolicy : ReconstructionPolicy<ShardType, QueryType> { +class TieringPolicy : public ReconstructionPolicy<ShardType, QueryType> { typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector; public: TieringPolicy(size_t scale_factor, size_t buffer_size) : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch, - size_t incoming_reccnt) override { + get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch, + size_t incoming_reccnt) const override { ReconstructionVector reconstructions; auto levels = epoch->get_structure()->get_level_vector(); @@ -39,7 +39,7 @@ public: for (level_index i = target_level; i > source_level; i--) { size_t target_reccnt = - (i < levels.size()) ? levels[i]->get_record_count() : 0; + (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0; size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt; reconstructions.add_reconstruction(i - 1, i, total_reccnt, @@ -50,17 +50,17 @@ public: } ReconstructionTask - get_flush_task(Epoch<ShardType, QueryType> *epoch) override { + get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override { return ReconstructionTask{ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush}; } private: - level_index find_reconstruction_target(LevelVector &levels) { + level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = 0; - for (size_t i = 0; i < (level_index) levels.size(); i++) { - if (levels[i].get_shard_count() + 1 <= capacity()) { + for (level_index i = 0; i < (level_index) levels.size(); i++) { + if (levels[i]->get_shard_count() + 1 <= capacity()) { target_level = i; break; } @@ -69,7 +69,7 @@ private: return target_level; } - inline size_t capacity() { + inline size_t capacity() const { return m_scale_factor; } diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 303ab2f..95c64ea 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -54,11 +54,13 @@ public: Epoch &operator=(const Epoch &) = delete; Epoch &operator=(Epoch &&) = delete; - size_t get_epoch_number() { return m_epoch_number; } + size_t get_epoch_number() const { return m_epoch_number; } - Structure *get_structure() { return m_structure; } + const Structure *get_structure() const { return m_structure; } - BufView get_buffer() { return m_buffer->get_buffer_view(m_buffer_head); } + Structure *get_mutable_structure() { return m_structure; } + + BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); } /* * Returns a new Epoch object that is a copy of this one. The new object diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 9b7ae87..3bb8a0b 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -147,14 +147,14 @@ public: inline void perform_reconstruction(ReconstructionTask task) { /* perform the reconstruction itself */ - std::vector<ShardType *> shards; + std::vector<const ShardType *> shards; for (ShardID shid : task.sources) { - assert(shid.level_idx < m_levels.size()); + assert(shid.level_idx < (level_index) m_levels.size()); assert(shid.shard_idx >= -1); /* if unspecified, push all shards into the vector */ if (shid.shard_idx == all_shards_idx) { - for (size_t i = 0; i < m_levels[shid.level_idx].get_shard_count(); + for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) { if (m_levels[shid.level_idx]->get_shard(i)) { shards.push_back(m_levels[shid.level_idx]->get_shard(i)); @@ -165,7 +165,7 @@ public: } } - auto new_shard = Shard(shards); + auto new_shard = new ShardType(shards); /* * Remove all of the shards processed by the operation @@ -181,10 +181,11 @@ public: /* * Append the new shard to the target level */ - if (task.target < m_levels.size()) { - m_levels[task.target]->append_shard(new_shard); - } else { - m_levels.push_back(); + if (task.target < (level_index)m_levels.size()) { + m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard)); + } else { /* grow the structure if needed */ + m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(task.target)); + m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard)); } } @@ -197,12 +198,20 @@ public: * the buffer itself. Given that we're unlikely to actually use policies * like that, we'll leave this as low priority. */ - ShardType *buffer_shard = new ShardType(buffer); - if (task.type == ReconstructionType::Append) { - m_levels[0]->append(std::shared_ptr(buffer_shard)); + + /* insert the first level, if needed */ + if (m_levels.size() == 0) { + m_levels.push_back( + std::make_shared<InternalLevel<ShardType, QueryType>>(0)); + } + + ShardType *buffer_shard = new ShardType(std::move(buffer)); + if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) { + m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard)); } else { - std::vector<ShardType *> shards; - for (size_t i = 0; i < m_levels[0].size(); i++) { + std::vector<const ShardType *> shards; + for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count(); + i++) { if (m_levels[0]->get_shard(i)) { shards.push_back(m_levels[0]->get_shard(i)); } @@ -210,7 +219,7 @@ public: shards.push_back(buffer_shard); ShardType *new_shard = new ShardType(shards); m_levels[0]->truncate(); - m_levels[0]->append(std::shared_ptr(new_shard)); + m_levels[0]->append(std::shared_ptr<ShardType>(new_shard)); } } } @@ -243,6 +252,32 @@ public: LevelVector const &get_level_vector() const { return m_levels; } + + /* + * Validate that no level in the structure exceeds its maximum tombstone + * capacity. This is used to trigger preemptive compactions at the end of + * the reconstruction process. + */ + bool validate_tombstone_proportion(double max_delete_prop) const { + long double ts_prop; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + ts_prop = (long double)m_levels[i]->get_tombstone_count() / + (long double)m_levels[i]->get_record_count(); + if (ts_prop > (long double)max_delete_prop) { + return false; + } + } + } + + return true; + } + + bool validate_tombstone_proportion(level_index level, double max_delete_prop) const { + long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) m_levels[level]->get_record_count(); + return ts_prop <= (long double) max_delete_prop; + } + private: std::atomic<size_t> m_refcnt; LevelVector m_levels; diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index 8cfcd49..c9d1749 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -40,12 +40,12 @@ public: * * No changes are made to this level. */ - ShardType *get_combined_shard() { + ShardType *get_combined_shard() const { if (m_shards.size() == 0) { return nullptr; } - std::vector<ShardType *> shards; + std::vector<const ShardType *> shards; for (auto shard : m_shards) { if (shard) shards.emplace_back(shard.get()); @@ -57,7 +57,7 @@ public: void get_local_queries( std::vector<std::pair<ShardID, ShardType *>> &shards, std::vector<typename QueryType::LocalQuery *> &local_queries, - typename QueryType::Parameters *query_parms) { + typename QueryType::Parameters *query_parms) const { for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { auto local_query = @@ -68,7 +68,7 @@ public: } } - bool check_tombstone(size_t shard_stop, const RecordType &rec) { + bool check_tombstone(size_t shard_stop, const RecordType &rec) const { if (m_shards.size() == 0) return false; @@ -100,7 +100,7 @@ public: return false; } - ShardType *get_shard(size_t idx) { + const ShardType *get_shard(size_t idx) const { if (idx >= m_shards.size()) { return nullptr; } @@ -108,9 +108,9 @@ public: return m_shards[idx].get(); } - size_t get_shard_count() { return m_shards.size(); } + size_t get_shard_count() const { return m_shards.size(); } - size_t get_record_count() { + size_t get_record_count() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { @@ -121,7 +121,7 @@ public: return cnt; } - size_t get_tombstone_count() { + size_t get_tombstone_count() const { size_t res = 0; for (size_t i = 0; i < m_shards.size(); ++i) { if (m_shards[i]) { @@ -131,7 +131,7 @@ public: return res; } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { @@ -142,7 +142,7 @@ public: return cnt; } - size_t get_memory_usage() { + size_t get_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { @@ -153,7 +153,7 @@ public: return cnt; } - double get_tombstone_prop() { + double get_tombstone_prop() const { size_t tscnt = 0; size_t reccnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { @@ -166,10 +166,10 @@ public: return (double)tscnt / (double)(tscnt + reccnt); } - std::shared_ptr<InternalLevel> clone() { + std::shared_ptr<InternalLevel> clone() const { auto new_level = std::make_shared<InternalLevel>(m_level_no); for (size_t i = 0; i < m_shards.size(); i++) { - new_level->m_shards[i] = m_shards[i]; + new_level->append(m_shards[i]); } return new_level; @@ -181,7 +181,7 @@ public: m_shards.erase(m_shards.begin() + shard); } - bool append(std::shared_ptr<ShardType> shard) { + void append(std::shared_ptr<ShardType> shard) { m_shards.emplace_back(shard); } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 625b04b..0eae73d 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -97,15 +97,15 @@ public: return true; } - size_t get_record_count() { return m_tail.load() - m_head.load().head_idx; } + size_t get_record_count() const { return m_tail.load() - m_head.load().head_idx; } - size_t get_capacity() { return m_cap; } + size_t get_capacity() const { return m_cap; } - bool is_full() { return get_record_count() >= m_hwm; } + bool is_full() const { return get_record_count() >= m_hwm; } - bool is_at_low_watermark() { return get_record_count() >= m_lwm; } + bool is_at_low_watermark() const { return get_record_count() >= m_lwm; } - size_t get_tombstone_count() { return m_tscnt.load(); } + size_t get_tombstone_count() const { return m_tscnt.load(); } bool delete_record(const R &rec) { return get_buffer_view().delete_record(rec); @@ -115,9 +115,9 @@ public: return get_buffer_view().check_tombstone(rec); } - size_t get_memory_usage() { return m_cap * sizeof(Wrapped<R>); } + size_t get_memory_usage() const { return m_cap * sizeof(Wrapped<R>); } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { return m_tombstone_filter->get_memory_usage(); } @@ -200,7 +200,7 @@ public: m_lwm = lwm; } - size_t get_low_watermark() { return m_lwm; } + size_t get_low_watermark() const { return m_lwm; } void set_high_watermark(size_t hwm) { assert(hwm > m_lwm); @@ -208,9 +208,9 @@ public: m_hwm = hwm; } - size_t get_high_watermark() { return m_hwm; } + size_t get_high_watermark() const { return m_hwm; } - size_t get_tail() { return m_tail.load(); } + size_t get_tail() const { return m_tail.load(); } /* * Note: this returns the available physical storage capacity, @@ -220,7 +220,7 @@ public: * but a buggy framework implementation may violate the * assumption. */ - size_t get_available_capacity() { + size_t get_available_capacity() const { if (m_old_head.load().refcnt == 0) { return m_cap - (m_tail.load() - m_head.load().head_idx); } diff --git a/include/shard/Alias.h b/include/shard/Alias.h index 8fe70a5..15b0884 100644 --- a/include/shard/Alias.h +++ b/include/shard/Alias.h @@ -70,7 +70,7 @@ public: } } - Alias(std::vector<Alias*> const &shards) + Alias(std::vector<const Alias*> const &shards) : m_data(nullptr) , m_alias(nullptr) , m_total_weight(0) @@ -146,15 +146,15 @@ public: } - size_t get_memory_usage() { + size_t get_memory_usage() const { return 0; } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { return (m_bf) ? m_bf->memory_usage() : 0; } - W get_total_weight() { + W get_total_weight() const { return m_total_weight; } diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index 4e51037..d720aad 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -82,7 +82,7 @@ public: delete[] temp_buffer; } - FSTrie(std::vector<FSTrie*> const &shards) + FSTrie(std::vector<const FSTrie*> const &shards) : m_data(nullptr) , m_reccnt(0) , m_alloc_size(0) @@ -181,11 +181,11 @@ public: } - size_t get_memory_usage() { + size_t get_memory_usage() const { return m_fst->getMemoryUsage(); } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { return m_alloc_size; } diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index 64c0b2b..f6b525f 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -65,7 +65,7 @@ public: } } - ISAMTree(std::vector<ISAMTree *> const &shards) + ISAMTree(std::vector<const ISAMTree *> const &shards) : m_bf(nullptr), m_isam_nodes(nullptr), m_root(nullptr), m_reccnt(0), m_tombstone_cnt(0), m_internal_node_cnt(0), m_deleted_cnt(0), m_alloc_size(0) { @@ -93,7 +93,7 @@ public: delete m_bf; } - Wrapped<R> *point_lookup(const R &rec, bool filter = false) { + Wrapped<R> *point_lookup(const R &rec, bool filter = false) const { if (filter && !m_bf->lookup(rec)) { return nullptr; } diff --git a/include/shard/LoudsPatricia.h b/include/shard/LoudsPatricia.h index 3452839..fe0c30e 100644 --- a/include/shard/LoudsPatricia.h +++ b/include/shard/LoudsPatricia.h @@ -82,7 +82,7 @@ public: delete[] temp_buffer; } - LoudsPatricia(std::vector<LoudsPatricia*> &shards) + LoudsPatricia(std::vector<const LoudsPatricia*> &shards) : m_data(nullptr) , m_reccnt(0) , m_alloc_size(0) @@ -178,11 +178,11 @@ public: } - size_t get_memory_usage() { + size_t get_memory_usage() const { return m_louds->size(); } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { return m_alloc_size; } diff --git a/include/shard/PGM.h b/include/shard/PGM.h index 7d1f492..5b39ab4 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -111,7 +111,7 @@ public: } } - PGM(std::vector<PGM*> const &shards) + PGM(std::vector<const PGM*> const &shards) : m_data(nullptr) , m_bf(nullptr) , m_reccnt(0) @@ -190,7 +190,7 @@ public: delete m_bf; } - Wrapped<R> *point_lookup(const R &rec, bool filter=false) { + Wrapped<R> *point_lookup(const R &rec, bool filter=false) const { size_t idx = get_lower_bound(rec.key); if (idx >= m_reccnt) { return nullptr; @@ -223,11 +223,11 @@ public: } - size_t get_memory_usage() { + size_t get_memory_usage() const { return m_pgm.size_in_bytes(); } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { return (m_bf) ? m_bf->memory_usage() : 0; } diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 9d8c3bb..7f4d4e5 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -124,7 +124,7 @@ public: } } - TrieSpline(std::vector<TrieSpline*> const &shards) + TrieSpline(std::vector<const TrieSpline*> const &shards) : m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) @@ -229,7 +229,7 @@ public: delete m_bf; } - Wrapped<R> *point_lookup(const R &rec, bool filter=false) { + Wrapped<R> *point_lookup(const R &rec, bool filter=false) const { if (filter && m_bf && !m_bf->lookup(rec)) { return nullptr; } @@ -266,11 +266,11 @@ public: } - size_t get_memory_usage() { + size_t get_memory_usage() const { return m_ts.GetSize(); } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { return (m_bf) ? m_bf->memory_usage() : 0; } diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h index 477db5c..7130efe 100644 --- a/include/shard/VPTree.h +++ b/include/shard/VPTree.h @@ -86,7 +86,7 @@ public: } } - VPTree(std::vector<VPTree*> shards) + VPTree(std::vector<const VPTree*> shards) : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) { size_t attemp_reccnt = 0; @@ -174,11 +174,11 @@ public: return m_data + idx; } - size_t get_memory_usage() { + size_t get_memory_usage() const { return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*); } - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { // FIXME: need to return the size of the unordered_map return 0; } diff --git a/include/util/SortedMerge.h b/include/util/SortedMerge.h index b0a3215..c41a7ae 100644 --- a/include/util/SortedMerge.h +++ b/include/util/SortedMerge.h @@ -51,7 +51,7 @@ struct merge_info { */ template <RecordInterface R, ShardInterface S> static std::vector<Cursor<Wrapped<R>>> -build_cursor_vec(std::vector<S *> const &shards, size_t *reccnt, +build_cursor_vec(std::vector<const S *> const &shards, size_t *reccnt, size_t *tscnt) { std::vector<Cursor<Wrapped<R>>> cursors; cursors.reserve(shards.size()); diff --git a/tests/de_bsm_tag.cpp b/tests/de_bsm_tag.cpp index 4063cfe..cc76f05 100644 --- a/tests/de_bsm_tag.cpp +++ b/tests/de_bsm_tag.cpp @@ -18,6 +18,8 @@ #include "shard/ISAMTree.h" #include "query/rangequery.h" +#include "framework/reconstruction/BSMPolicy.h" + #include <check.h> using namespace de; @@ -25,7 +27,8 @@ typedef Rec R; typedef ISAMTree<R> S; typedef rq::Query<S> Q; -typedef DynamicExtension<S, Q, LayoutPolicy::BSM, DeletePolicy::TAGGING, SerialScheduler> DE; +typedef DynamicExtension<S, Q, DeletePolicy::TAGGING, SerialScheduler> DE; +ReconstructionPolicy<S,Q> *recon = new BSMPolicy<S,Q>(1000); #include "include/dynamic_extension.h" diff --git a/tests/de_bsm_tomb.cpp b/tests/de_bsm_tomb.cpp index 3a24e87..205de4d 100644 --- a/tests/de_bsm_tomb.cpp +++ b/tests/de_bsm_tomb.cpp @@ -18,6 +18,8 @@ #include "shard/ISAMTree.h" #include "query/rangequery.h" +#include "framework/reconstruction/BSMPolicy.h" + #include <check.h> using namespace de; @@ -25,7 +27,8 @@ typedef Rec R; typedef ISAMTree<R> S; typedef rq::Query<S> Q; -typedef DynamicExtension<S, Q, LayoutPolicy::BSM, DeletePolicy::TOMBSTONE, SerialScheduler> DE; +typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE; +ReconstructionPolicy<S, Q> *recon = new BSMPolicy<S, Q>(1000); #include "include/dynamic_extension.h" diff --git a/tests/de_level_concurrent.cpp b/tests/de_level_concurrent.cpp index afd1af2..a76285b 100644 --- a/tests/de_level_concurrent.cpp +++ b/tests/de_level_concurrent.cpp @@ -17,6 +17,7 @@ #include "framework/DynamicExtension.h" #include "shard/ISAMTree.h" #include "query/rangequery.h" +#include "framework/reconstruction/LevelingPolicy.h" #include <check.h> using namespace de; @@ -25,7 +26,9 @@ typedef Rec R; typedef ISAMTree<R> S; typedef rq::Query<S> Q; -typedef DynamicExtension<S, Q, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, SerialScheduler> DE; +typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE; +ReconstructionPolicy<S, Q> *recon = new LevelingPolicy<S, Q>(2, 1000); +ReconstructionPolicy<S, Q> *recon2 = new LevelingPolicy<S, Q>(4, 10000); #include "include/concurrent_extension.h" diff --git a/tests/de_level_tag.cpp b/tests/de_level_tag.cpp index c175357..6fead54 100644 --- a/tests/de_level_tag.cpp +++ b/tests/de_level_tag.cpp @@ -17,6 +17,7 @@ #include "framework/DynamicExtension.h" #include "shard/ISAMTree.h" #include "query/rangequery.h" +#include "framework/reconstruction/LevelingPolicy.h" #include <check.h> using namespace de; @@ -25,7 +26,8 @@ typedef Rec R; typedef ISAMTree<R> S; typedef rq::Query<S> Q; -typedef DynamicExtension<S, Q, LayoutPolicy::LEVELING, DeletePolicy::TAGGING, SerialScheduler> DE; +typedef DynamicExtension<S, Q, DeletePolicy::TAGGING, SerialScheduler> DE; +ReconstructionPolicy<S, Q> *recon = new LevelingPolicy<S, Q>(2, 1000); #include "include/dynamic_extension.h" diff --git a/tests/de_level_tomb.cpp b/tests/de_level_tomb.cpp index e587817..31e87a0 100644 --- a/tests/de_level_tomb.cpp +++ b/tests/de_level_tomb.cpp @@ -18,6 +18,7 @@ #include "shard/ISAMTree.h" #include "query/rangequery.h" #include "shard/TrieSpline.h" +#include "framework/reconstruction/LevelingPolicy.h" #include <check.h> using namespace de; @@ -26,7 +27,8 @@ typedef Rec R; typedef ISAMTree<R> S; typedef rq::Query<S> Q; -typedef DynamicExtension<S, Q, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, SerialScheduler> DE; +typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE; +ReconstructionPolicy<S, Q> *recon = new LevelingPolicy<S, Q>(2, 1000); #include "include/dynamic_extension.h" diff --git a/tests/de_tier_concurrent.cpp b/tests/de_tier_concurrent.cpp index ce41dbc..cba84f1 100644 --- a/tests/de_tier_concurrent.cpp +++ b/tests/de_tier_concurrent.cpp @@ -18,6 +18,7 @@ #include "shard/ISAMTree.h" #include "query/rangequery.h" #include "framework/scheduling//FIFOScheduler.h" +#include "framework/reconstruction/TieringPolicy.h" #include <check.h> using namespace de; @@ -26,7 +27,9 @@ typedef Rec R; typedef ISAMTree<R> S; typedef rq::Query<S> Q; -typedef DynamicExtension<S, Q, LayoutPolicy::TEIRING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; +typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; +ReconstructionPolicy<S, Q> *recon = new TieringPolicy<S, Q>(2, 1000); +ReconstructionPolicy<S, Q> *recon2 = new TieringPolicy<S, Q>(4, 10000); #include "include/concurrent_extension.h" diff --git a/tests/de_tier_tag.cpp b/tests/de_tier_tag.cpp index 97a5299..d640d10 100644 --- a/tests/de_tier_tag.cpp +++ b/tests/de_tier_tag.cpp @@ -18,6 +18,7 @@ #include "framework/scheduling/SerialScheduler.h" #include "shard/ISAMTree.h" #include "query/rangequery.h" +#include "framework/reconstruction/TieringPolicy.h" #include <check.h> using namespace de; @@ -26,7 +27,8 @@ typedef Rec R; typedef ISAMTree<R> S; typedef rq::Query<S> Q; -typedef DynamicExtension<S, Q, LayoutPolicy::TEIRING, DeletePolicy::TAGGING, SerialScheduler> DE; +typedef DynamicExtension<S, Q, DeletePolicy::TAGGING, SerialScheduler> DE; +ReconstructionPolicy<S, Q> *recon = new TieringPolicy<S, Q>(2, 1000); #include "include/dynamic_extension.h" diff --git a/tests/de_tier_tomb.cpp b/tests/de_tier_tomb.cpp index 930d0d5..42b0625 100644 --- a/tests/de_tier_tomb.cpp +++ b/tests/de_tier_tomb.cpp @@ -18,6 +18,7 @@ #include "shard/ISAMTree.h" #include "shard/TrieSpline.h" #include "query/rangequery.h" +#include "framework/reconstruction/TieringPolicy.h" #include <check.h> using namespace de; @@ -25,7 +26,8 @@ using namespace de; typedef Rec R; typedef ISAMTree<R> S; typedef rq::Query<S> Q; -typedef DynamicExtension<S, Q, LayoutPolicy::TEIRING, DeletePolicy::TOMBSTONE, SerialScheduler> DE; +typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE; +ReconstructionPolicy<S, Q> *recon = new TieringPolicy<S, Q>(2, 1000); #include "include/dynamic_extension.h" diff --git a/tests/include/concurrent_extension.h b/tests/include/concurrent_extension.h index d99cd23..84f816d 100644 --- a/tests/include/concurrent_extension.h +++ b/tests/include/concurrent_extension.h @@ -22,24 +22,28 @@ * should be included in the source file that includes this one, above the * include statement. */ -// #include "testing.h" -// #include "framework/DynamicExtension.h" -// //#include "framework/scheduling/FIFOScheduler.h" -// #include "shard/ISAMTree.h" -// #include "query/rangequery.h" -// #include <check.h> -// #include <set> -// #include <random> +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/reconstruction/TieringPolicy.h" +#include "testing.h" +#include "framework/DynamicExtension.h" +#include "framework/scheduling/FIFOScheduler.h" +#include "shard/ISAMTree.h" +#include "query/rangequery.h" +#include <check.h> +#include <set> +#include <random> // using namespace de; // typedef Rec R; // typedef ISAMTree<R> S; // typedef rq::Query<S> Q; -// typedef DynamicExtension<S, Q, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE> DE; //, FIFOScheduler> DE; +// typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; +// ReconstructionPolicy<S,Q> *recon = new TieringPolicy<S, Q>(2, 1000); +// ReconstructionPolicy<S,Q> *recon2 = new TieringPolicy<S, Q>(4, 10000); START_TEST(t_create) { - auto test_de = new DE(100, 1000, 2); + auto test_de = new DE(recon, 100, 1000); ck_assert_ptr_nonnull(test_de); ck_assert_int_eq(test_de->get_record_count(), 0); @@ -52,7 +56,7 @@ END_TEST START_TEST(t_insert) { - auto test_de = new DE(100, 1000, 2); + auto test_de = new DE(recon, 100, 1000); uint64_t key = 0; uint32_t val = 0; @@ -73,7 +77,7 @@ END_TEST START_TEST(t_debug_insert) { - auto test_de = new DE(100, 1000, 2); + auto test_de = new DE(recon, 100, 1000); uint64_t key = 0; uint32_t val = 0; @@ -92,7 +96,7 @@ END_TEST START_TEST(t_insert_with_mem_merges) { - auto test_de = new DE(100, 1000, 2); + auto test_de = new DE(recon, 100, 1000); uint64_t key = 0; uint32_t val = 0; @@ -135,7 +139,7 @@ END_TEST START_TEST(t_range_query) { - auto test_de = new DE(1000, 10000, 4); + auto test_de = new DE(recon2, 1000, 10000); size_t n = 10000000; std::vector<uint64_t> keys; @@ -189,7 +193,7 @@ END_TEST START_TEST(t_tombstone_merging_01) { size_t reccnt = 100000; - auto test_de = new DE(100, 1000, 2); + auto test_de = new DE(recon, 100, 1000); auto rng = gsl_rng_alloc(gsl_rng_mt19937); @@ -242,54 +246,12 @@ START_TEST(t_tombstone_merging_01) } END_TEST -DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) { - auto rng = gsl_rng_alloc(gsl_rng_mt19937); - - auto test_de = new DE(1000, 10000, 2); - - std::set<R> records; - std::set<R> to_delete; - std::set<R> deleted; - - while (records.size() < reccnt) { - uint64_t key = rand(); - uint32_t val = rand(); - - if (records.find({key, val}) != records.end()) continue; - - records.insert({key, val}); - } - - for (auto rec : records) { - ck_assert_int_eq(test_de->insert(rec), 1); - - if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) { - std::vector<R> del_vec; - std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); - - for (size_t i=0; i<del_vec.size(); i++) { - test_de->erase(del_vec[i]); - to_delete.erase(del_vec[i]); - deleted.insert(del_vec[i]); - } - } - - if (gsl_rng_uniform(rng) < 0.25 && deleted.find(rec) == deleted.end()) { - to_delete.insert(rec); - } - } - - gsl_rng_free(rng); - - return test_de; -} - START_TEST(t_static_structure) { auto rng = gsl_rng_alloc(gsl_rng_mt19937); size_t reccnt = 100000; - auto test_de = new DE(100, 1000, 2); + auto test_de = new DE(recon, 100, 1000); std::set<R> records; std::set<R> to_delete; diff --git a/tests/include/dynamic_extension.h b/tests/include/dynamic_extension.h index 326bb72..a1ab20a 100644 --- a/tests/include/dynamic_extension.h +++ b/tests/include/dynamic_extension.h @@ -23,26 +23,28 @@ * include statement. */ -// #include "testing.h" -// #include "framework/DynamicExtension.h" -// #include "framework/scheduling/SerialScheduler.h" -// #include "shard/ISAMTree.h" -// #include "query/rangequery.h" -// #include <check.h> -// #include <random> -// #include <set> +#include "framework/reconstruction/BSMPolicy.h" +#include "framework/reconstruction/TieringPolicy.h" +#include "testing.h" +#include "framework/DynamicExtension.h" +#include "framework/scheduling/SerialScheduler.h" +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "shard/ISAMTree.h" +#include "query/rangequery.h" +#include <check.h> +#include <random> +#include <set> // using namespace de; // typedef Rec R; // typedef ISAMTree<R> S; // typedef rq::Query<S> Q; -// typedef DynamicExtension<S, Q, LayoutPolicy::TEIRING, DeletePolicy::TAGGING, SerialScheduler> DE; +// typedef DynamicExtension<S, Q, DeletePolicy::TAGGING, SerialScheduler> DE; +// ReconstructionPolicy<S, Q> *recon = new TieringPolicy<S, Q>(1000, 2); - -#include "framework/util/Configuration.h" START_TEST(t_create) { - auto test_de = new DE(100, 1000, 2); + auto test_de = new DE(recon, 100); ck_assert_ptr_nonnull(test_de); ck_assert_int_eq(test_de->get_record_count(), 0); @@ -55,7 +57,7 @@ END_TEST START_TEST(t_insert) { - auto test_de = new DE(100, 1000, 2); + auto test_de = new DE(recon, 100); uint64_t key = 0; uint32_t val = 0; @@ -76,7 +78,7 @@ END_TEST START_TEST(t_debug_insert) { - auto test_de = new DE(100, 1000, 2); + auto test_de = new DE(recon, 100); uint64_t key = 0; uint32_t val = 0; @@ -95,7 +97,7 @@ END_TEST START_TEST(t_insert_with_mem_merges) { - auto test_de = new DE(100, 1000, 2); + auto test_de = new DE(recon, 100); uint64_t key = 0; uint32_t val = 0; @@ -114,7 +116,7 @@ START_TEST(t_insert_with_mem_merges) * BSM grows on every flush, so the height will be different than * normal layout policies */ - if (test_de->Layout == de::LayoutPolicy::BSM) { + if (dynamic_cast<const BSMPolicy<S, Q>*>(recon)) { ck_assert_int_eq(test_de->get_height(), 2); } else { ck_assert_int_eq(test_de->get_height(), 1); @@ -127,7 +129,7 @@ END_TEST START_TEST(t_range_query) { - auto test_de = new DE(100, 1000, 2); + auto test_de = new DE(recon, 100); size_t n = 10000; std::vector<uint64_t> keys; @@ -175,7 +177,7 @@ END_TEST START_TEST(t_tombstone_merging_01) { size_t reccnt = 100000; - auto test_de = new DE(100, 1000, 2); + auto test_de = new DE(recon, 100); auto rng = gsl_rng_alloc(gsl_rng_mt19937); @@ -224,54 +226,13 @@ START_TEST(t_tombstone_merging_01) } END_TEST -[[maybe_unused]] static DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) { - auto rng = gsl_rng_alloc(gsl_rng_mt19937); - - auto test_de = new DE(1000, 10000, 2); - - std::set<Rec> records; - std::set<Rec> to_delete; - std::set<Rec> deleted; - - while (records.size() < reccnt) { - uint64_t key = rand(); - uint32_t val = rand(); - - if (records.find({key, val}) != records.end()) continue; - - records.insert({key, val}); - } - - for (auto rec : records) { - ck_assert_int_eq(test_de->insert(rec), 1); - - if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) { - std::vector<Rec> del_vec; - std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); - - for (size_t i=0; i<del_vec.size(); i++) { - test_de->erase(del_vec[i]); - to_delete.erase(del_vec[i]); - deleted.insert(del_vec[i]); - } - } - - if (gsl_rng_uniform(rng) < 0.25 && deleted.find(rec) == deleted.end()) { - to_delete.insert(rec); - } - } - - gsl_rng_free(rng); - - return test_de; -} START_TEST(t_static_structure) { auto rng = gsl_rng_alloc(gsl_rng_mt19937); size_t reccnt = 100000; - auto test_de = new DE(100, 1000, 2); + auto test_de = new DE(recon, 100); std::set<Rec> records; std::set<Rec> to_delete; diff --git a/tests/include/shard_standard.h b/tests/include/shard_standard.h index de43edc..0b5ab00 100644 --- a/tests/include/shard_standard.h +++ b/tests/include/shard_standard.h @@ -77,7 +77,7 @@ START_TEST(t_shard_init) auto shard2 = new Shard(mbuffer2->get_buffer_view()); auto shard3 = new Shard(mbuffer3->get_buffer_view()); - std::vector<Shard*> shards = {shard1, shard2, shard3}; + std::vector<const Shard*> shards = {shard1, shard2, shard3}; auto shard4 = new Shard(shards); ck_assert_int_eq(shard4->get_record_count(), n * 3); @@ -130,7 +130,7 @@ START_TEST(t_full_cancelation) ck_assert_int_eq(shard_ts->get_record_count(), n); ck_assert_int_eq(shard_ts->get_tombstone_count(), n); - std::vector<Shard *> shards = {shard, shard_ts}; + std::vector<const Shard *> shards = {shard, shard_ts}; Shard* merged = new Shard(shards); diff --git a/tests/include/shard_string.h b/tests/include/shard_string.h index 2ef4cec..7a3d761 100644 --- a/tests/include/shard_string.h +++ b/tests/include/shard_string.h @@ -67,7 +67,7 @@ START_TEST(t_shard_init) auto shard2 = new Shard(mbuffer2->get_buffer_view()); auto shard3 = new Shard(mbuffer3->get_buffer_view()); - std::vector<Shard*> shards = {shard1, shard2, shard3}; + std::vector<const Shard*> shards = {shard1, shard2, shard3}; auto shard4 = new Shard(shards); ck_assert_int_eq(shard4->get_record_count(), n * 3); diff --git a/tests/internal_level_tests.cpp b/tests/internal_level_tests.cpp deleted file mode 100644 index e11b7c7..0000000 --- a/tests/internal_level_tests.cpp +++ /dev/null @@ -1,95 +0,0 @@ -/* - * tests/internal_level_tests.cpp - * - * Unit tests for InternalLevel - * - * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> - * Dong Xie <dongx@psu.edu> - * - * Distributed under the Modified BSD License. - * - */ -#include "shard/ISAMTree.h" -#include "query/rangequery.h" -#include "framework/structure/InternalLevel.h" -#include "framework/interface/Record.h" -#include "framework/interface/Query.h" -#include "framework/interface/Shard.h" - -#include "include/testing.h" - -#include <check.h> - -using namespace de; - -typedef InternalLevel<ISAMTree<Rec>, rq::Query<ISAMTree<Rec>>> ILevel; - -START_TEST(t_memlevel_merge) -{ - auto tbl1 = create_test_mbuffer<Rec>(100); - auto tbl2 = create_test_mbuffer<Rec>(100); - - auto base_level = new ILevel(1, 1); - base_level->append_buffer(tbl1->get_buffer_view()); - ck_assert_int_eq(base_level->get_record_count(), 100); - - auto merging_level = new ILevel(0, 1); - merging_level->append_buffer(tbl2->get_buffer_view()); - ck_assert_int_eq(merging_level->get_record_count(), 100); - - auto new_level = ILevel::reconstruction(base_level, merging_level); - - delete merging_level; - ck_assert_int_eq(new_level->get_record_count(), 200); - - delete base_level; - delete tbl1; - delete tbl2; -} - - -ILevel *create_test_memlevel(size_t reccnt) { - auto tbl1 = create_test_mbuffer<Rec>(reccnt/2); - auto tbl2 = create_test_mbuffer<Rec>(reccnt/2); - - auto base_level = new ILevel(1, 2); - base_level->append_buffer(tbl1->get_buffer_view()); - base_level->append_buffer(tbl2->get_buffer_view()); - - delete tbl1; - delete tbl2; - - return base_level; -} - -Suite *unit_testing() -{ - Suite *unit = suite_create("InternalLevel Unit Testing"); - - TCase *merge = tcase_create("de::InternalLevel::reconstruction Testing"); - tcase_add_test(merge, t_memlevel_merge); - suite_add_tcase(unit, merge); - - return unit; -} - -int run_unit_tests() -{ - int failed = 0; - Suite *unit = unit_testing(); - SRunner *unit_runner = srunner_create(unit); - - srunner_run_all(unit_runner, CK_NORMAL); - failed = srunner_ntests_failed(unit_runner); - srunner_free(unit_runner); - - return failed; -} - - -int main() -{ - int unit_failed = run_unit_tests(); - - return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; -} diff --git a/tests/vptree_tests.cpp b/tests/vptree_tests.cpp index 53bb526..49964e5 100644 --- a/tests/vptree_tests.cpp +++ b/tests/vptree_tests.cpp @@ -52,7 +52,7 @@ START_TEST(t_wss_init) auto shard2 = new Shard(mbuffer2->get_buffer_view()); auto shard3 = new Shard(mbuffer3->get_buffer_view()); - std::vector<Shard *> shards = {shard1, shard2, shard3}; + std::vector<const Shard *> shards = {shard1, shard2, shard3}; auto shard4 = new Shard(shards); ck_assert_int_eq(shard4->get_record_count(), n * 3); |