diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-01-31 16:16:28 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-01-31 16:16:28 -0500 |
| commit | 3061bfbf1c4b5092fa4234de3105a615fcef18ea (patch) | |
| tree | 4c41cdf3b8e3af6e38ba1c8f283d2e35086cc2af | |
| parent | 30da48151f58803968ca3ef5d42e66a9223d80a4 (diff) | |
| download | dynamic-extension-3061bfbf1c4b5092fa4234de3105a615fcef18ea.tar.gz | |
More updates
| -rw-r--r-- | CMakeLists.txt | 8 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 80 | ||||
| -rw-r--r-- | include/framework/reconstruction/LevelingPolicy.h | 11 | ||||
| -rw-r--r-- | include/framework/scheduling/FIFOScheduler.h | 2 | ||||
| -rw-r--r-- | include/framework/scheduling/Version.h | 9 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 6 | ||||
| -rw-r--r-- | include/framework/util/Configuration.h | 7 | ||||
| -rw-r--r-- | tests/de_level_concurrent.cpp | 5 | ||||
| -rw-r--r-- | tests/de_level_tag.cpp | 7 | ||||
| -rw-r--r-- | tests/include/concurrent_extension.h | 14 | ||||
| -rw-r--r-- | tests/include/dynamic_extension.h | 37 |
11 files changed, 124 insertions, 62 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index d506da2..902889f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -89,10 +89,10 @@ if (tests) # target_link_options(de_tier_tomb PUBLIC -mcx16) # target_include_directories(de_tier_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external) - # add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp) - # target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread atomic) - # target_link_options(de_level_tag PUBLIC -mcx16) - # target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external) + add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp) + target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread atomic) + target_link_options(de_level_tag PUBLIC -mcx16) + target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external) # add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp) # target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread atomic) diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index da2945a..91bed98 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -87,6 +87,8 @@ public: m_config.maximum_threads); m_active_version.store( std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0)); + + m_version_counter = INITIAL_VERSION; } /** @@ -226,7 +228,7 @@ public: * @return The number of levels within the index */ size_t get_height() { - return get_active_version()->get_structure()->get_height(); + return get_active_version()->get_structure()->get_height() - 1; } /** @@ -381,8 +383,6 @@ private: size_t m_core_cnt; std::atomic<int> m_next_core; - ReconPolicyType const *m_recon_policy; - /* versioning + concurrency variables */ std::atomic<size_t> m_version_counter; std::atomic<std::shared_ptr<VersionType>> m_active_version; @@ -390,6 +390,8 @@ private: std::condition_variable m_version_advance_cv; std::mutex m_version_advance_mtx; + std::atomic<bool> m_flush_in_progress = false; + alignas(64) std::atomic<bool> m_scheduling_reconstruction; void enforce_delete_invariant(VersionType *version) { @@ -420,6 +422,8 @@ private: } } + size_t m_flush_cnt = 0; + static void reconstruction(void *arguments) { auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments; auto extension = (DynamicExtension *)args->extension; @@ -437,11 +441,14 @@ private: * this code will be bypassed in that case. */ if (args->priority == ReconstructionPriority::FLUSH) { + assert(extension->m_flush_in_progress.load()); /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); - args->version->set_next_buffer_head(buffview.get_tail()); + assert(buffview.get_tail() != buffview.get_head()); + auto new_shard = std::make_shared<ShardType>(std::move(buffview)); + /* * Flushes already know their version id. To avoid needing to * do any update reconciliation between structures, they wait @@ -449,14 +456,22 @@ private: * and only then take a copy of the structure. */ extension->await_version(args->version->get_id() - 1); - StructureType *structure = - extension->get_active_version()->get_structure()->copy(); - /* add the newly created shard to the structure copy */ - structure->append_l0(new_shard); + if (extension->m_config.recon_maint_disabled) { + assert(args->version->get_mutable_structure()); + args->version->get_mutable_structure()->append_l0(std::move(new_shard)); + } else { + assert(!args->version->get_mutable_structure()); + auto structure = std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy()); - /* set this version's structure to the newly created one */ - args->version->set_structure(std::unique_ptr<StructureType>(structure)); + /* add the newly created shard to the structure copy */ + structure->append_l0(std::move(new_shard)); + + /* set this version's structure to the newly created one */ + args->version->set_structure(std::move(structure)); + } + + args->version->advance_buffer_head(); } /* perform all of the reconstructions */ @@ -476,6 +491,10 @@ private: /* advance the index to the newly finished version */ extension->install_new_version(args->version); + if (args->priority == ReconstructionPriority::FLUSH) { + extension->m_flush_in_progress.store(false); + } + /* manually delete the argument object */ delete args; } @@ -549,11 +568,11 @@ private: * at the time it is activated, but the version number must be claimed * early to minimize activation blocking. */ - version_ptr create_version() { + version_ptr create_version_flush(std::unique_ptr<StructureType> structure) { size_t version_id = m_version_counter.fetch_add(1); auto active_version = get_active_version(); std::shared_ptr<VersionType> new_version = - std::make_shared<VersionType>(version_id, nullptr, m_buffer.get(), active_version->get_buffer().get_head()); + std::make_shared<VersionType>(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head()); return new_version; } @@ -565,7 +584,7 @@ private: * no version number is claimed until the version is activated, to * prevent blocking buffer flushes. */ - version_ptr create_version(std::unique_ptr<StructureType> structure) { + version_ptr create_version_maint(std::unique_ptr<StructureType> structure) { auto active_version = get_active_version(); version_ptr new_version = std::make_shared<VersionType>(INVALID_VERSION, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head()); @@ -622,11 +641,34 @@ private: void schedule_flush() { begin_reconstruction_scheduling(); - auto new_version = create_version(); + + bool old = m_flush_in_progress.load(); + if (old || !m_flush_in_progress.compare_exchange_strong(old, true)) { + end_reconstruction_scheduling(); + return; + } + + /* + * for "legacy" policies, without background reconstruction, we need + * a valid structure object as part of the version prior to determining + * the flush operations. Otherwise, the flush operation should only ever + * do the flush itself, and so no structure is needed at this point + * + * delaying obtaining the structure until later, when maintenance + * reconstructions are enabled, removes the need for flushes to reconcile + * differences in structure between when they are scheduled and when they + * finish + */ + std::unique_ptr<StructureType> structure = nullptr; + if (m_config.recon_maint_disabled) { + structure = std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy()); + } + + auto new_version = create_version_flush(std::move(structure)); auto *args = new ReconstructionArgs<ShardType, QueryType>(); args->version = new_version; - args->tasks = m_recon_policy->get_flush_tasks(args->version.get()); + args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::FLUSH; @@ -646,16 +688,20 @@ private: void schedule_maint_reconstruction(bool take_reconstruction_lock = true) { + if (m_config.recon_maint_disabled) { + return; + } + if (take_reconstruction_lock) { begin_reconstruction_scheduling(); } // FIXME: memory management issue here? - auto new_version = create_version(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy())); + auto new_version = create_version_maint(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy())); auto *args = new ReconstructionArgs<ShardType, QueryType>(); args->version = new_version; - args->tasks = m_recon_policy->get_reconstruction_tasks(args->version.get()); + args->tasks = m_config.recon_policy->get_reconstruction_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::MAINT; diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h index 1523e74..3a0b73e 100644 --- a/include/framework/reconstruction/LevelingPolicy.h +++ b/include/framework/reconstruction/LevelingPolicy.h @@ -36,8 +36,7 @@ public: auto levels = version->get_structure()->get_level_vector(); level_index target_level = find_reconstruction_target(levels); - assert(target_level != -1); - level_index source_level = 1; + level_index source_level = 0; if (target_level == invalid_level_idx) { /* grow */ @@ -47,7 +46,9 @@ public: for (level_index i = target_level; i > source_level; i--) { size_t target_reccnt = (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0; - size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt; + size_t total_reccnt = + (i == 0) ? m_buffer_size + target_reccnt + : levels[i - 1]->get_record_count() + target_reccnt; reconstructions.add_reconstruction(i - 1, i, total_reccnt, ReconstructionType::Merge); @@ -61,7 +62,7 @@ private: level_index target_level = invalid_level_idx; size_t incoming_records = m_buffer_size; - for (level_index i = 0; i < (level_index)levels.size(); i++) { + for (level_index i = 1; i < (level_index)levels.size(); i++) { if (levels[i]->get_record_count() + incoming_records < capacity(i)) { target_level = i; break; @@ -74,7 +75,7 @@ private: } inline size_t capacity(level_index level) const { - return m_buffer_size * pow(m_scale_factor, level + 1); + return m_buffer_size * pow(m_scale_factor, level); } size_t m_scale_factor; diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 7cb6d20..4c1db8d 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -82,6 +82,7 @@ private: std::atomic<size_t> m_counter; std::mutex m_cv_lock; std::condition_variable m_cv; + std::mutex m_queue_lock; std::thread m_sched_thrd; std::thread m_sched_wakeup_thrd; @@ -102,6 +103,7 @@ private: } void schedule_next() { + auto lk = std::unique_lock<std::mutex>(m_queue_lock); assert(m_task_queue.size() > 0); auto t = m_task_queue.pop(); m_stats.job_scheduled(t.m_timestamp); diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index 8d3d038..9c62ea1 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -84,13 +84,10 @@ public: return version; } - void set_next_buffer_head(size_t new_head) { - m_pending_buffer_head = new_head; - } - bool advance_buffer_head() { - m_buffer_head = m_pending_buffer_head; - return m_buffer->advance_head(m_buffer_head); + auto new_head = m_buffer->get_buffer_view().get_tail(); + m_buffer_head = new_head; + return m_buffer->advance_head(new_head); } private: diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 62c27f5..60fb6c7 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -27,7 +27,10 @@ class ExtensionStructure { typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector; public: - ExtensionStructure() = default; + ExtensionStructure() { + m_levels.emplace_back(std::make_shared<InternalLevel<ShardType, QueryType>>(0)); + } + ~ExtensionStructure() = default; /* @@ -217,7 +220,6 @@ public: } void append_l0(std::shared_ptr<ShardType> shard) { - // FIXME: ensure that there's always a level 0 in the version m_levels[0]->append(shard); } diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index a751a29..81698d2 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -20,9 +20,9 @@ DeletePolicy D, SchedulerInterface SchedType> class DEConfiguration { public: DEConfiguration(std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy) - : m_recon_policy(std::move(recon_policy)) {} + : recon_policy(std::move(recon_policy)) {} - std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> m_recon_policy; + std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy; /* buffer parameters */ size_t buffer_count = 1; @@ -31,8 +31,9 @@ class DEConfiguration { /* reconstruction triggers */ bool recon_enable_seek_trigger = false; - bool recon_enable_maint_on_flush = true; + bool recon_enable_maint_on_flush = false; bool recon_enable_delete_cmpct = false; + bool recon_maint_disabled = true; size_t recon_l0_capacity = 0; /* 0 for unbounded */ double maximum_delete_proportion = 1; diff --git a/tests/de_level_concurrent.cpp b/tests/de_level_concurrent.cpp index a43006e..5415744 100644 --- a/tests/de_level_concurrent.cpp +++ b/tests/de_level_concurrent.cpp @@ -16,6 +16,7 @@ #include "framework/scheduling/SerialScheduler.h" #include "include/testing.h" #include "framework/DynamicExtension.h" +#include "framework/scheduling/FIFOScheduler.h" #include "shard/ISAMTree.h" #include "query/rangequery.h" #include "framework/reconstruction/LevelingPolicy.h" @@ -28,8 +29,8 @@ typedef Rec R; typedef ISAMTree<R> S; typedef rq::Query<S> Q; -typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE; -typedef de::DEConfiguration<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> CONF; +typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; +typedef de::DEConfiguration<S, Q, DeletePolicy::TOMBSTONE, FIFOScheduler> CONF; #include "include/concurrent_extension.h" diff --git a/tests/de_level_tag.cpp b/tests/de_level_tag.cpp index 6fead54..da7cf7c 100644 --- a/tests/de_level_tag.cpp +++ b/tests/de_level_tag.cpp @@ -20,14 +20,15 @@ #include "framework/reconstruction/LevelingPolicy.h" #include <check.h> -using namespace de; + +using namespace de; typedef Rec R; typedef ISAMTree<R> S; typedef rq::Query<S> Q; - +typedef LevelingPolicy<S, Q> Policy; typedef DynamicExtension<S, Q, DeletePolicy::TAGGING, SerialScheduler> DE; -ReconstructionPolicy<S, Q> *recon = new LevelingPolicy<S, Q>(2, 1000); +typedef de::DEConfiguration<S, Q, DeletePolicy::TAGGING, SerialScheduler> CONF; #include "include/dynamic_extension.h" diff --git a/tests/include/concurrent_extension.h b/tests/include/concurrent_extension.h index fa669d2..f6d01fd 100644 --- a/tests/include/concurrent_extension.h +++ b/tests/include/concurrent_extension.h @@ -37,21 +37,19 @@ -using namespace de; -typedef Rec R; -typedef ISAMTree<R> S; -typedef rq::Query<S> Q; -typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE; -typedef de::DEConfiguration<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> CONF; +// using namespace de; +// typedef Rec R; +// typedef ISAMTree<R> S; +// typedef rq::Query<S> Q; +// typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE; +// typedef de::DEConfiguration<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> CONF; static CONF create_config(size_t type=1) { if (type == 1) { auto recon = std::make_unique<LevelingPolicy<S, Q>>(2, 1000); - return CONF(std::move(recon)); } else { auto recon2 = std::make_unique<LevelingPolicy<S, Q>>(4, 10000); - CONF configuration2 = CONF(std::move(recon2)); return CONF(std::move(recon2)); } } diff --git a/tests/include/dynamic_extension.h b/tests/include/dynamic_extension.h index a1ab20a..c378ea9 100644 --- a/tests/include/dynamic_extension.h +++ b/tests/include/dynamic_extension.h @@ -29,6 +29,7 @@ #include "framework/DynamicExtension.h" #include "framework/scheduling/SerialScheduler.h" #include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/reconstruction/LevelingPolicy.h" #include "shard/ISAMTree.h" #include "query/rangequery.h" #include <check.h> @@ -39,12 +40,22 @@ // typedef Rec R; // typedef ISAMTree<R> S; // typedef rq::Query<S> Q; -// typedef DynamicExtension<S, Q, DeletePolicy::TAGGING, SerialScheduler> DE; -// ReconstructionPolicy<S, Q> *recon = new TieringPolicy<S, Q>(1000, 2); +// typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE; +// typedef de::DEConfiguration<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> CONF; + +static CONF create_config(size_t type=1) { + if (type == 1) { + auto recon = std::make_unique<Policy>(2, 100); + return CONF(std::move(recon)); + } else { + auto recon2 = std::make_unique<Policy>(4, 1000); + return CONF(std::move(recon2)); + } +} START_TEST(t_create) { - auto test_de = new DE(recon, 100); + auto test_de = new DE(create_config()); ck_assert_ptr_nonnull(test_de); ck_assert_int_eq(test_de->get_record_count(), 0); @@ -57,7 +68,7 @@ END_TEST START_TEST(t_insert) { - auto test_de = new DE(recon, 100); + auto test_de = new DE(create_config()); uint64_t key = 0; uint32_t val = 0; @@ -78,7 +89,7 @@ END_TEST START_TEST(t_debug_insert) { - auto test_de = new DE(recon, 100); + auto test_de = new DE(create_config()); uint64_t key = 0; uint32_t val = 0; @@ -97,7 +108,7 @@ END_TEST START_TEST(t_insert_with_mem_merges) { - auto test_de = new DE(recon, 100); + auto test_de = new DE(create_config()); uint64_t key = 0; uint32_t val = 0; @@ -108,7 +119,7 @@ START_TEST(t_insert_with_mem_merges) val++; } - test_de->await_next_epoch(); + test_de->await_version(); ck_assert_int_eq(test_de->get_record_count(), 300); @@ -116,11 +127,13 @@ START_TEST(t_insert_with_mem_merges) * BSM grows on every flush, so the height will be different than * normal layout policies */ + /* if (dynamic_cast<const BSMPolicy<S, Q>*>(recon)) { ck_assert_int_eq(test_de->get_height(), 2); } else { ck_assert_int_eq(test_de->get_height(), 1); } + */ delete test_de; } @@ -129,7 +142,7 @@ END_TEST START_TEST(t_range_query) { - auto test_de = new DE(recon, 100); + auto test_de = new DE(create_config()); size_t n = 10000; std::vector<uint64_t> keys; @@ -146,7 +159,7 @@ START_TEST(t_range_query) ck_assert_int_eq(test_de->insert(r), 1); } - test_de->await_next_epoch(); + test_de->await_version(); std::sort(keys.begin(), keys.end()); @@ -177,7 +190,7 @@ END_TEST START_TEST(t_tombstone_merging_01) { size_t reccnt = 100000; - auto test_de = new DE(recon, 100); + auto test_de = new DE(create_config()); auto rng = gsl_rng_alloc(gsl_rng_mt19937); @@ -217,7 +230,7 @@ START_TEST(t_tombstone_merging_01) } } - test_de->await_next_epoch(); + test_de->await_version(); ck_assert(test_de->validate_tombstone_proportion()); @@ -232,7 +245,7 @@ START_TEST(t_static_structure) auto rng = gsl_rng_alloc(gsl_rng_mt19937); size_t reccnt = 100000; - auto test_de = new DE(recon, 100); + auto test_de = new DE(create_config()); std::set<Rec> records; std::set<Rec> to_delete; |