diff options
| -rw-r--r-- | CMakeLists.txt | 5 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 21 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 60 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 19 | ||||
| -rw-r--r-- | include/framework/util/Configuration.h | 3 | ||||
| -rw-r--r-- | include/util/types.h | 12 | ||||
| -rw-r--r-- | tests/de_bsm_tomb.cpp | 58 |
7 files changed, 167 insertions, 11 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index d88db24..cdc114f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -96,6 +96,11 @@ if (tests) target_link_options(de_level_tomb PUBLIC -mcx16) target_include_directories(de_level_tomb PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external) + add_executable(de_bsm_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_bsm_tomb.cpp) + target_link_libraries(de_bsm_tomb PUBLIC gsl check subunit pthread atomic) + target_link_options(de_bsm_tomb PUBLIC -mcx16) + target_include_directories(de_bsm_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external) + add_executable(de_level_concurrent ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_concurrent.cpp) target_link_libraries(de_level_concurrent PUBLIC gsl check subunit pthread atomic) target_link_options(de_level_concurrent PUBLIC -mcx16) diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 6fd95c6..538ff25 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -54,6 +54,10 @@ public: , m_next_core(0) , m_epoch_cnt(0) { + if constexpr (L == LayoutPolicy::BSM) { + assert(scale_factor == 2); + } + auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); m_previous_epoch.store({nullptr, 0}); @@ -487,10 +491,17 @@ private: ((DynamicExtension *) args->extension)->SetThreadAffinity(); Structure *vers = args->epoch->get_structure(); - for (ssize_t i=0; i<args->merges.size(); i++) { - vers->reconstruction(args->merges[i].target, args->merges[i].source); + if constexpr (L == LayoutPolicy::BSM) { + if (args->merges.size() > 0) { + vers->reconstruction(args->merges[0]); + } + } else { + for (ssize_t i=0; i<args->merges.size(); i++) { + vers->reconstruction(args->merges[i].target, args->merges[i].sources[0]); + } } + /* * we'll grab the buffer AFTER doing the internal reconstruction, so we * can flush as many records as possible in one go. The reconstruction @@ -628,7 +639,7 @@ private: static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) { if constexpr (Q::SKIP_DELETE_FILTER) { - return records; + return std::move(records); } std::vector<Wrapped<R>> processed_records; @@ -691,6 +702,10 @@ private: #ifdef _GNU_SOURCE void SetThreadAffinity() { + if constexpr (std::same_as<SCHED, SerialScheduler>) { + return; + } + int core = m_next_core.fetch_add(1) % m_core_cnt; cpu_set_t mask; CPU_ZERO(&mask); diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index ffba1c1..b83674b 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -343,6 +343,30 @@ public: base_level = grow(scratch_state); } + if constexpr (L == LayoutPolicy::BSM) { + if (base_level == 0) { + return std::move(reconstructions); + } + + ReconstructionTask task; + task.target = base_level; + + size_t base_reccnt = 0; + for (level_index i=base_level; i>source_level; i--) { + auto recon_reccnt = scratch_state[i-1].reccnt; + base_reccnt += recon_reccnt; + scratch_state[i-1].reccnt = 0; + scratch_state[i-1].shardcnt = 0; + task.add_source(i-1, recon_reccnt); + } + + reconstructions.add_reconstruction(task); + scratch_state[base_level].reccnt = base_reccnt; + scratch_state[base_level].shardcnt = 1; + + return std::move(reconstructions); + } + /* * Determine the full set of reconstructions necessary to open up * space in the source level. @@ -384,6 +408,33 @@ public: return std::move(reconstructions); } + inline void reconstruction(ReconstructionTask task) { + static_assert(L == LayoutPolicy::BSM); + std::vector<InternalLevel<R, Shard, Q>*> levels(task.sources.size()); + for (size_t i=0; i<task.sources.size(); i++) { + levels[i] = m_levels[task.sources[i]].get(); + } + + auto new_level = InternalLevel<R, Shard, Q>::reconstruction(levels, task.target); + if (task.target >= m_levels.size()) { + m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target), + 1, 1}); + m_levels.emplace_back(new_level); + } else { + m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target), + 1, 1}; + m_levels[task.target] = new_level; + } + + /* remove all of the levels that have been flattened */ + for (size_t i=0; i<task.sources.size(); i++) { + m_levels[task.sources[i]] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(task.sources[i], 1)); + m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1}; + } + + return; + } + /* * Combine incoming_level with base_level and reconstruct the shard, * placing it in base_level. The two levels should be sequential--i.e. no @@ -395,7 +446,6 @@ public: if (base_level >= m_levels.size()) { m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(base_level, shard_capacity))); - m_current_state.push_back({0, calc_level_record_capacity(base_level), 0, shard_capacity}); } @@ -418,7 +468,7 @@ public: m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); /* - * Update the state vector to match the *real* stage following + * Update the state vector to match the *real* state following * the reconstruction */ m_current_state[base_level] = {m_levels[base_level]->get_record_count(), @@ -576,9 +626,11 @@ private: return false; } - if (L == LayoutPolicy::LEVELING) { + if constexpr (L == LayoutPolicy::LEVELING) { return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; - } else { + } else if constexpr (L == LayoutPolicy::BSM) { + return state[idx].reccnt == 0; + } else { return state[idx].shardcnt < state[idx].shardcap; } diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index db38946..b962dcc 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -64,6 +64,21 @@ public: return std::shared_ptr<InternalLevel>(res); } + static std::shared_ptr<InternalLevel> reconstruction(std::vector<InternalLevel*> levels, size_t level_idx) { + std::vector<Shard *> shards; + for (auto level : levels) { + for (auto shard : level->m_shards) { + if (shard) shards.emplace_back(shard.get()); + } + } + + auto res = new InternalLevel(level_idx, 1); + res->m_shard_cnt = 1; + res->m_shards[0] = std::make_shared<S>(shards); + + return std::shared_ptr<InternalLevel>(res); + } + /* * Create a new shard combining the records from all of * the shards in level, and append this new shard into @@ -185,6 +200,10 @@ public: } Shard* get_shard(size_t idx) { + if (idx >= m_shard_cnt) { + return nullptr; + } + return m_shards[idx].get(); } diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 55cc682..4a4524a 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -35,7 +35,8 @@ static thread_local size_t sampling_bailouts = 0; enum class LayoutPolicy { LEVELING, - TEIRING + TEIRING, + BSM }; enum class DeletePolicy { diff --git a/include/util/types.h b/include/util/types.h index bac0246..cf61412 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -73,10 +73,16 @@ const ShardID INVALID_SHID = {-1, -1}; typedef ssize_t level_index; -typedef struct { - level_index source; +typedef struct ReconstructionTask { + std::vector<level_index> sources; level_index target; size_t reccnt; + + void add_source(level_index source, size_t cnt) { + sources.push_back(source); + reccnt += cnt; + } + } ReconstructionTask; class ReconstructionVector { @@ -91,7 +97,7 @@ public: } void add_reconstruction(level_index source, level_index target, size_t reccnt) { - m_tasks.push_back({source, target, reccnt}); + m_tasks.push_back({{source}, target, reccnt}); total_reccnt += reccnt; } diff --git a/tests/de_bsm_tomb.cpp b/tests/de_bsm_tomb.cpp new file mode 100644 index 0000000..493440e --- /dev/null +++ b/tests/de_bsm_tomb.cpp @@ -0,0 +1,58 @@ +/* + * tests/de_level_tomb.cpp + * + * Unit tests for Dynamic Extension Framework + * + * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#include <set> +#include <random> +#include <algorithm> + +#include "include/testing.h" +#include "framework/DynamicExtension.h" +#include "shard/ISAMTree.h" +#include "query/rangequery.h" + +#include <check.h> +using namespace de; + +typedef Rec R; +typedef DynamicExtension<R, ISAMTree<R>, rq::Query<R, ISAMTree<R>>, LayoutPolicy::BSM, DeletePolicy::TOMBSTONE, SerialScheduler> DE; + +#include "include/dynamic_extension.h" + + +Suite *unit_testing() +{ + Suite *unit = suite_create("DynamicExtension: Tombstone BSM Testing"); + inject_dynamic_extension_tests(unit); + + return unit; +} + + +int shard_unit_tests() +{ + int failed = 0; + Suite *unit = unit_testing(); + SRunner *unit_shardner = srunner_create(unit); + + srunner_run_all(unit_shardner, CK_NORMAL); + failed = srunner_ntests_failed(unit_shardner); + srunner_free(unit_shardner); + + return failed; +} + + +int main() +{ + int unit_failed = shard_unit_tests(); + + return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; +} |