diff options
| -rw-r--r-- | CMakeLists.txt | 76 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 229 | ||||
| -rw-r--r-- | include/framework/reconstruction/BSMPolicy.h | 14 | ||||
| -rw-r--r-- | include/framework/reconstruction/FixedShardCountPolicy.h | 2 | ||||
| -rw-r--r-- | include/framework/reconstruction/FloodL0Policy.h | 9 | ||||
| -rw-r--r-- | include/framework/reconstruction/LevelingPolicy.h | 21 | ||||
| -rw-r--r-- | include/framework/reconstruction/ReconstructionPolicy.h | 3 | ||||
| -rw-r--r-- | include/framework/reconstruction/TieringPolicy.h | 17 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 2 | ||||
| -rw-r--r-- | include/framework/scheduling/Version.h | 102 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 8 | ||||
| -rw-r--r-- | include/framework/util/Configuration.h | 4 | ||||
| -rw-r--r-- | tests/de_level_concurrent.cpp | 6 | ||||
| -rw-r--r-- | tests/include/concurrent_extension.h | 55 |
14 files changed, 273 insertions, 275 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 5e62641..d506da2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 3.22) -set(CMAKE_C_COMPILER clang) -set(CMAKE_CXX_COMPILER clang++) +set(CMAKE_C_COMPILER gcc-12) +set(CMAKE_CXX_COMPILER g++-12) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED True) @@ -9,11 +9,11 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(namespace "de") project("Practical Dynamic Extension" VERSION 0.1.0) -set(debug false) +set(debug true) set(tests True) set(bench false) -set(vldb_bench true) -set(tail_bench true) +set(vldb_bench false) +set(tail_bench false) # ALEX doesn't build under C++20 set(build_alex false) @@ -79,45 +79,45 @@ if (tests) target_link_options(vptree_tests PUBLIC -mcx16) target_include_directories(vptree_tests PRIVATE include external/vptree external/psudb-common/cpp/include) - add_executable(de_tier_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tag.cpp) - target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread atomic) - target_link_options(de_tier_tag PUBLIC -mcx16) - target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include external) - - add_executable(de_tier_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tomb.cpp) - target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread atomic) - target_link_options(de_tier_tomb PUBLIC -mcx16) - target_include_directories(de_tier_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external) - - add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp) - target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread atomic) - target_link_options(de_level_tag PUBLIC -mcx16) - target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external) - - add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp) - target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread atomic) - target_link_options(de_level_tomb PUBLIC -mcx16) - target_include_directories(de_level_tomb PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external) - - add_executable(de_bsm_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_bsm_tomb.cpp) - target_link_libraries(de_bsm_tomb PUBLIC gsl check subunit pthread atomic) - target_link_options(de_bsm_tomb PUBLIC -mcx16) - target_include_directories(de_bsm_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external) - - add_executable(de_bsm_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_bsm_tag.cpp) - target_link_libraries(de_bsm_tag PUBLIC gsl check subunit pthread atomic) - target_link_options(de_bsm_tag PUBLIC -mcx16) - target_include_directories(de_bsm_tag PRIVATE include external/PLEX/include external/psudb-common/cpp/include external) + # add_executable(de_tier_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tag.cpp) + # target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread atomic) + # target_link_options(de_tier_tag PUBLIC -mcx16) + # target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include external) + + # add_executable(de_tier_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tomb.cpp) + # target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread atomic) + # target_link_options(de_tier_tomb PUBLIC -mcx16) + # target_include_directories(de_tier_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external) + + # add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp) + # target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread atomic) + # target_link_options(de_level_tag PUBLIC -mcx16) + # target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external) + + # add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp) + # target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread atomic) + # target_link_options(de_level_tomb PUBLIC -mcx16) + # target_include_directories(de_level_tomb PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external) + + # add_executable(de_bsm_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_bsm_tomb.cpp) + # target_link_libraries(de_bsm_tomb PUBLIC gsl check subunit pthread atomic) + # target_link_options(de_bsm_tomb PUBLIC -mcx16) + # target_include_directories(de_bsm_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external) + + # add_executable(de_bsm_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_bsm_tag.cpp) + # target_link_libraries(de_bsm_tag PUBLIC gsl check subunit pthread atomic) + # target_link_options(de_bsm_tag PUBLIC -mcx16) + # target_include_directories(de_bsm_tag PRIVATE include external/PLEX/include external/psudb-common/cpp/include external) add_executable(de_level_concurrent ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_concurrent.cpp) target_link_libraries(de_level_concurrent PUBLIC gsl check subunit pthread atomic) target_link_options(de_level_concurrent PUBLIC -mcx16) target_include_directories(de_level_concurrent PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external) - add_executable(de_tier_concurrent ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_concurrent.cpp) - target_link_libraries(de_tier_concurrent PUBLIC gsl check subunit pthread atomic) - target_link_options(de_tier_concurrent PUBLIC -mcx16) - target_include_directories(de_tier_concurrent PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external) + # add_executable(de_tier_concurrent ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_concurrent.cpp) + # target_link_libraries(de_tier_concurrent PUBLIC gsl check subunit pthread atomic) + # target_link_options(de_tier_concurrent PUBLIC -mcx16) + # target_include_directories(de_tier_concurrent PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external) add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp) target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread atomic) diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 0331353..da2945a 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -36,10 +36,10 @@ class DynamicExtension { private: /* convenience typedefs for commonly used types within the class */ typedef typename ShardType::RECORD RecordType; - typedef MutableBuffer<RecordType> Buffer; - typedef ExtensionStructure<ShardType, QueryType> Structure; - typedef Version<ShardType, QueryType> Version; - typedef BufferView<RecordType> BufView; + typedef MutableBuffer<RecordType> BufferType; + typedef ExtensionStructure<ShardType, QueryType> StructureType; + typedef Version<ShardType, QueryType> VersionType; + typedef BufferView<RecordType> BufferViewType; typedef ReconstructionPolicy<ShardType, QueryType> ReconPolicyType; typedef DEConfiguration<ShardType, QueryType, D, SchedType> ConfType; @@ -52,6 +52,11 @@ private: static constexpr size_t QUERY = 1; static constexpr size_t RECONSTRUCTION = 2; + typedef std::shared_ptr<VersionType> version_ptr; + typedef size_t version_id; + static constexpr size_t INVALID_VERSION = 0; + static constexpr size_t INITIAL_VERSION = 1; + public: /** * Create a new Dynamized version of a data structure, supporting @@ -74,12 +79,14 @@ public: * framework's scheduler for use in answering queries and * performing compactions and flushes, etc. */ - DynamicExtension(ConfType &&config) : m_config(std::move(config)) { - m_buffer = std::make_unique(m_config.buffer_flush_trigger, - m_config.buffer_size); - - m_sched = SchedType(m_config.maximum_memory_usage, m_config.maximum_threads); - m_active_version.load(std::make_shared(INITIAL_VERSION, new Structure(), m_buffer.get(), 0)); + DynamicExtension(ConfType &&config) : m_config(std::move(config)) { + m_buffer = + std::make_unique<BufferType>(m_config.buffer_flush_trigger, m_config.buffer_size); + + m_sched = std::make_unique<SchedType>(m_config.maximum_memory_usage, + m_config.maximum_threads); + m_active_version.store( + std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0)); } /** @@ -90,10 +97,10 @@ public: */ ~DynamicExtension() { /* let any in-flight version transitions finish */ - await_newest_version(); + await_version(); /* shutdown the scheduler */ - m_sched.shutdown(); + m_sched->shutdown(); } /** @@ -274,10 +281,11 @@ public: * the index. Ownership of this object is transfered to the * caller. */ + // FIXME: switch this over to std::unique_ptr ShardType * create_static_structure(bool await_reconstruction_completion = false) { if (await_reconstruction_completion) { - await_newest_version(); + await_version(); } auto version = get_active_version(); @@ -317,19 +325,27 @@ public: } /* - * Determines the newest pending version at the time of call, and - * blocks until that version becomes active. + * Blocks until the specified version id becomes active. If no version + * id is provided, wait for the newest pending version to be installed. + * + * NOTE: this method will return once the specified version has been + * installed, but does not guarantee that the specified version + * is the currently active one when it returns. It is possible + * that the active version upon the return of this method is newer + * than the one requested. */ - void await_newest_version() { - /* + void await_version(version_id vid = INVALID_VERSION) { + /* * versions are assigned by fetch and add on the counter, so the * newest assigned version number will be one less than the value * of the counter */ - auto newest_pending_version = m_version_counter.load() - 1; + if (vid == INVALID_VERSION) { + vid = m_version_counter.load() - 1; + } /* versions signal on m_version_advance_cv when they activate */ - while (m_active_version.load() < newest_pending_version) { + while (m_active_version.load()->get_id() < vid) { std::unique_lock lk(m_version_advance_mtx); m_version_advance_cv.wait(lk); } @@ -354,14 +370,13 @@ public: * Calls SchedType::print_statistics, which should write a report of * scheduler performance statistics to stdout. */ - void print_scheduler_statistics() const { m_sched.print_statistics(); } + void print_scheduler_statistics() const { m_sched->print_statistics(); } private: ConfType m_config; - - SchedType m_sched; - std::unique_ptr<Buffer> m_buffer; + std::unique_ptr<SchedType> m_sched; + std::unique_ptr<BufferType> m_buffer; size_t m_core_cnt; std::atomic<int> m_next_core; @@ -370,20 +385,14 @@ private: /* versioning + concurrency variables */ std::atomic<size_t> m_version_counter; - typedef std::atomic<std::shared_ptr<Version>> version_ptr; - - version_ptr m_active_version; - - typedef size_t version_id; - const size_t INVALID_VERSION = 0; - const size_t INITIAL_VERSION = 1; + std::atomic<std::shared_ptr<VersionType>> m_active_version; std::condition_variable m_version_advance_cv; std::mutex m_version_advance_mtx; alignas(64) std::atomic<bool> m_scheduling_reconstruction; - void enforce_delete_invariant(Version *version) { + void enforce_delete_invariant(VersionType *version) { auto structure = version->get_structure(); auto compactions = structure->get_compaction_tasks(); @@ -401,7 +410,7 @@ private: auto wait = args->result.get_future(); - m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); + m_sched->schedule_job(reconstruction, 0, args, RECONSTRUCTION); /* wait for compaction completion */ wait.get(); @@ -413,60 +422,61 @@ private: static void reconstruction(void *arguments) { auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments; - auto extension = (DynamicExtension *) args->extension; + auto extension = (DynamicExtension *)args->extension; extension->SetThreadAffinity(); + /* + * For "normal" flushes, the task vector should be empty, so this is + * all that will happen. Piggybacking internal reconstructions off + * the flush WILL bottleneck the system, but is left in place to + * allow the traditional policies (LEVELING, TIERING, etc.) to be + * emulated within the new system. + * + * Background reconstructions will not have a priority level of + * FLUSH, and will already have a structure present. As a result, + * this code will be bypassed in that case. + */ if (args->priority == ReconstructionPriority::FLUSH) { /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); - auto new_head = buffview.get_tail(); - auto new_shard = Shard(std::move(buffview)); + args->version->set_next_buffer_head(buffview.get_tail()); + auto new_shard = std::make_shared<ShardType>(std::move(buffview)); - /* copy the currently active version's structure */ - auto structure = extension->get_active_version()->get_structure()->clone(); - - - } + /* + * Flushes already know their version id. To avoid needing to + * do any update reconciliation between structures, they wait + * until the version directly preceeding them has been installed, + * and only then take a copy of the structure. + */ + extension->await_version(args->version->get_id() - 1); + StructureType *structure = + extension->get_active_version()->get_structure()->copy(); + /* add the newly created shard to the structure copy */ + structure->append_l0(new_shard); - else { - + /* set this version's structure to the newly created one */ + args->version->set_structure(std::unique_ptr<StructureType>(structure)); } - - Structure *vers = args->version->get_mutable_structure(); - - ReconstructionTask flush_task; - flush_task.type = ReconstructionType::Invalid; - + /* perform all of the reconstructions */ + StructureType *structure = args->version->get_mutable_structure(); for (size_t i = 0; i < args->tasks.size(); i++) { - if (args->tasks[i].sources.size() > 0 && - args->tasks[i].sources[0] == buffer_shid) { - flush_task = args->tasks[i]; - continue; - } - - vers->perform_reconstruction(args->tasks[i]); + structure->perform_reconstruction(args->tasks[i]); } - if (flush_task.type != ReconstructionType::Invalid) { - /* - * we'll grab the buffer AFTER doing the internal reconstruction, so we - * can flush as many records as possible in one go. The reconstruction - * was done so as to make room for the full buffer anyway, so there's - * no real benefit to doing this first. - */ - auto buffer_view = args->version->get_buffer(); - size_t new_head = buffer_view.get_tail(); - - vers->perform_flush(flush_task, std::move(buffer_view)); - args->result.set_value(true); - ((DynamicExtension *)args->extension)->advance_version(new_head); - } else { - args->result.set_value(true); + /* + * if there isn't already a version id on the new version (i.e., the + * reconstruction isn't a flush), generate one. + */ + if (args->version->get_id() == INVALID_VERSION) { + args->version->set_id(extension->m_version_counter.fetch_add(1)); } + /* advance the index to the newly finished version */ + extension->install_new_version(args->version); + /* manually delete the argument object */ delete args; } @@ -532,7 +542,7 @@ private: version_ptr get_active_version() { return m_active_version.load(); } - /* + /* * Create a new version with an assigned version number, but without * an assigned copy of the structure. Intended for use in flushing, * where the structure will be copied from the currently active version @@ -542,7 +552,8 @@ private: version_ptr create_version() { size_t version_id = m_version_counter.fetch_add(1); auto active_version = get_active_version(); - version_ptr new_version = std::make_shared<Version>(version_id, nullptr, &m_buffer, 0); + std::shared_ptr<VersionType> new_version = + std::make_shared<VersionType>(version_id, nullptr, m_buffer.get(), active_version->get_buffer().get_head()); return new_version; } @@ -551,51 +562,57 @@ private: * Create a new version without an assigned version number, but with * a copy of the extension structure. This is for use with background * reconstructions, where the underlying structure is manipulated, but - * no version number is claimed until the version is activated, to + * no version number is claimed until the version is activated, to * prevent blocking buffer flushes. */ - version_ptr create_version(Structure *structure) { - version_ptr new_version = std::make_shared<Version>(INVALID_VERSION, structure, &m_buffer, 0); + version_ptr create_version(std::unique_ptr<StructureType> structure) { + auto active_version = get_active_version(); + version_ptr new_version = + std::make_shared<VersionType>(INVALID_VERSION, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head()); return new_version; } void install_new_version(version_ptr new_version) { assert(new_version->get_structure()); - assert(new_version->get_version_number() != INVALID_VERSION); - - auto old = get_active_version(); - assert(new_version->get_version_number() > old->get_version_number()); + assert(new_version->get_id() != INVALID_VERSION); /* wait until our turn to install the new version */ - auto lk = std::unique_lock(m_version_advance_mtx); - while (new_version.load()->get_verison_number() != old.load()->get_version_number() + 1) { - m_version_advance_cv.wait(lk); - old = get_active_version(); - } + await_version(new_version->get_id() - 1); + + auto old = get_active_version(); // FIXME: implement this interface // new_version->merge_changes_from(old.load().get()); - /* + /* * Only one version can have a given number, so we are safe to * directly assign here--nobody else is going to change it out from - * under us. We're also protected under the mutex. + * under us. + */ + m_active_version.store(new_version); + + /* + * My understanding is that you don't *really* need this mutex for + * safety in modern C++ when sending the signal. But I'll grab it + * anyway out of an abundance of caution. I doubt this will be a + * major bottleneck. */ - m_active_version.store(new_version); - m_version_advance_cv.notify_all(); + auto lk = std::unique_lock(m_version_advance_mtx); + m_version_advance_cv.notify_all(); } - Structure *create_scratch_structure() { + StructureType *create_scratch_structure() { return get_active_version()->get_structure()->copy(); } - void begin_reconstruction_scheduling() { bool cur_val; do { cur_val = m_scheduling_reconstruction.load(); - } while(cur_val == true && !m_scheduling_reconstruction.compare_exchange_strong(cur_val, true)); + } while ( + cur_val == true && + !m_scheduling_reconstruction.compare_exchange_strong(cur_val, true)); } void end_reconstruction_scheduling() { @@ -608,16 +625,17 @@ private: auto new_version = create_version(); auto *args = new ReconstructionArgs<ShardType, QueryType>(); - args->version.load(new_version); + args->version = new_version; args->tasks = m_recon_policy->get_flush_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::FLUSH; - /* - * NOTE: args is deleted by the reconstruction job, so shouldn't be - * freed here + /* + * NOTE: args is deleted by the reconstruction job, so shouldn't be + * freed here */ - m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION); + m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, + RECONSTRUCTION); if (m_config.recon_enable_maint_on_flush) { schedule_maint_reconstruction(false); @@ -626,28 +644,27 @@ private: end_reconstruction_scheduling(); } - - void schedule_maint_reconstruction(bool take_reconstruction_lock=true) { + void schedule_maint_reconstruction(bool take_reconstruction_lock = true) { if (take_reconstruction_lock) { begin_reconstruction_scheduling(); } // FIXME: memory management issue here? - auto new_version = create_version(m_active_version.load()->get_structure()); + auto new_version = create_version(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy())); auto *args = new ReconstructionArgs<ShardType, QueryType>(); - args->version.load(new_version); + args->version = new_version; args->tasks = m_recon_policy->get_reconstruction_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::MAINT; - /* - * NOTE: args is deleted by the reconstruction job, so shouldn't be - * freed here + /* + * NOTE: args is deleted by the reconstruction job, so shouldn't be + * freed here */ - m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION); - + m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, + RECONSTRUCTION); if (take_reconstruction_lock) { end_reconstruction_scheduling(); @@ -662,7 +679,7 @@ private: args->query_parms = std::move(query_parms); auto result = args->result_set.get_future(); - m_sched.schedule_job(async_query, 0, (void *)args, QUERY); + m_sched->schedule_job(async_query, 0, (void *)args, QUERY); return result; } diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h index c42b928..eaa374a 100644 --- a/include/framework/reconstruction/BSMPolicy.h +++ b/include/framework/reconstruction/BSMPolicy.h @@ -11,7 +11,7 @@ #include <cmath> #include "framework/reconstruction/ReconstructionPolicy.h" -#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/Version.h" #include "util/types.h" namespace de { @@ -28,6 +28,12 @@ public: get_reconstruction_tasks(const Version<ShardType, QueryType> *version, size_t incoming_reccnt) const override { ReconstructionVector reconstructions; + return reconstructions; + } + + ReconstructionVector + get_flush_tasks(const Version<ShardType, QueryType> *version) const override { + ReconstructionVector reconstructions; auto levels = version->get_structure()->get_level_vector(); level_index target_level = find_reconstruction_target(levels); @@ -53,12 +59,6 @@ public: return reconstructions; } - ReconstructionVector - get_flush_tasks(const Version<ShardType, QueryType> *version) const override { - ReconstructionVector v; - v.add_reconstruction(ReconstructionTask {{buffer_shid}, 0, m_buffer_size, ReconstructionType::Merge}); - } - private: level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = invalid_level_idx; diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h index 2a3c977..0768daa 100644 --- a/include/framework/reconstruction/FixedShardCountPolicy.h +++ b/include/framework/reconstruction/FixedShardCountPolicy.h @@ -11,7 +11,7 @@ #include <cmath> #include "framework/reconstruction/ReconstructionPolicy.h" -#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/Version.h" #include "util/types.h" namespace de { diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h index 8304d8a..94bed70 100644 --- a/include/framework/reconstruction/FloodL0Policy.h +++ b/include/framework/reconstruction/FloodL0Policy.h @@ -11,7 +11,7 @@ #include <cmath> #include "framework/reconstruction/ReconstructionPolicy.h" -#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/Version.h" #include "util/types.h" namespace de { @@ -26,17 +26,14 @@ public: ReconstructionVector get_reconstruction_tasks(const Version<ShardType, QueryType> *version, size_t incoming_reccnt) const override { - ReconstructionVector reconstructions; return reconstructions; - } ReconstructionVector get_flush_tasks(const Version<ShardType, QueryType> *version) const override { - ReconstructionVector v; - v.add_reconstruction(ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}); + ReconstructionVector reconstructions; + return reconstructions; } private: diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h index 176492e..1523e74 100644 --- a/include/framework/reconstruction/LevelingPolicy.h +++ b/include/framework/reconstruction/LevelingPolicy.h @@ -11,7 +11,7 @@ #include <cmath> #include "framework/reconstruction/ReconstructionPolicy.h" -#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/Version.h" #include "util/types.h" namespace de { @@ -25,14 +25,19 @@ public: : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(const Version<ShardType, QueryType> *version, - size_t incoming_reccnt) const override { + get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override { + ReconstructionVector reconstructions; + return reconstructions; + } + + ReconstructionVector + get_flush_tasks(const Version<ShardType, QueryType> *version) const override { ReconstructionVector reconstructions; auto levels = version->get_structure()->get_level_vector(); level_index target_level = find_reconstruction_target(levels); assert(target_level != -1); - level_index source_level = 0; + level_index source_level = 1; if (target_level == invalid_level_idx) { /* grow */ @@ -51,14 +56,6 @@ public: return reconstructions; } - ReconstructionVector - get_flush_tasks(const Version<ShardType, QueryType> *version) const override { - ReconstructionVector v; - v.add_reconstruction(ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge}); - return v; - } - private: level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = invalid_level_idx; diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h index 48bddcf..6f99b32 100644 --- a/include/framework/reconstruction/ReconstructionPolicy.h +++ b/include/framework/reconstruction/ReconstructionPolicy.h @@ -23,8 +23,7 @@ class ReconstructionPolicy { public: ReconstructionPolicy() {} - virtual ReconstructionVector get_reconstruction_tasks(const Version<ShardType, QueryType> *version, - size_t incoming_reccnt) const = 0; + virtual ReconstructionVector get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const = 0; virtual ReconstructionVector get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0; }; } diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h index 63be5fe..dce5c3c 100644 --- a/include/framework/reconstruction/TieringPolicy.h +++ b/include/framework/reconstruction/TieringPolicy.h @@ -11,7 +11,7 @@ #include <cmath> #include "framework/reconstruction/ReconstructionPolicy.h" -#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/Version.h" #include "util/types.h" namespace de { @@ -26,6 +26,12 @@ public: get_reconstruction_tasks(const Version<ShardType, QueryType> *version, size_t incoming_reccnt) const override { ReconstructionVector reconstructions; + return reconstructions; + } + + ReconstructionVector + get_flush_tasks(const Version<ShardType, QueryType> *version) const override { + ReconstructionVector reconstructions; auto levels = version->get_structure()->get_level_vector(); level_index target_level = find_reconstruction_target(levels); @@ -49,15 +55,6 @@ public: return reconstructions; } - ReconstructionVector - get_flush_tasks(const Version<ShardType, QueryType> *version) const override { - ReconstructionVector v; - - v.add_reconstruction(ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}); - return v; - } - private: level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = invalid_level_idx; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 197e8bf..1591909 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -32,7 +32,7 @@ enum class ReconstructionPriority { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> struct ReconstructionArgs { typedef typename ShardType::RECORD RecordType; - std::atomic<std::shared_ptr<Version<ShardType, QueryType>>> version; + std::shared_ptr<Version<ShardType, QueryType>> version; ReconstructionVector tasks; void *extension; ReconstructionPriority priority; diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index 2b2b5ba..8d3d038 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -20,30 +20,21 @@ template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> class Version { private: typedef typename ShardType::RECORD RecordType; - typedef MutableBuffer<RecordType> Buffer; - typedef ExtensionStructure<ShardType, QueryType> Structure; - typedef BufferView<RecordType> BufView; + typedef MutableBuffer<RecordType> BufferType; + typedef ExtensionStructure<ShardType, QueryType> StructureType; + typedef BufferView<RecordType> BufferViewType; public: - Version(size_t number = 0) - : m_buffer(nullptr), m_structure(nullptr), m_active_merge(false), - m_epoch_number(number), m_buffer_head(0) {} - - Version(size_t number, Structure *structure, Buffer *buff, size_t head) - : m_buffer(buff), m_structure(structure), m_active_merge(false), - m_epoch_number(number), m_buffer_head(head) { - structure->take_reference(); - } + Version(size_t vid = 0) + : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0), + m_pending_buffer_head(-1) {} - ~Version() { - if (m_structure) { - m_structure->release_reference(); - } + Version(size_t number, std::unique_ptr<StructureType> structure, BufferType *buff, + size_t head) + : m_buffer(buff), m_structure(std::move(structure)), m_id(number), + m_buffer_head(head), m_pending_buffer_head(-1) {} - if (m_structure->get_reference_count() == 0) { - delete m_structure; - } - } + ~Version() = default; /* * Versions are *not* copyable or movable. Only one can exist, and all users @@ -54,13 +45,26 @@ public: Version &operator=(const Version &) = delete; Version &operator=(Version &&) = delete; - size_t get_epoch_number() const { return m_epoch_number; } + size_t get_id() const { return m_id; } + + void set_id(size_t id) { m_id = id;} - const Structure *get_structure() const { return m_structure.get(); } + const StructureType *get_structure() const { return m_structure.get(); } - Structure *get_mutable_structure() { return m_structure.get(); } + StructureType *get_mutable_structure() { return m_structure.get(); } - BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); } + bool set_structure(std::unique_ptr<StructureType> new_struct) { + if (m_structure) { + return false; + } + + m_structure = std::move(new_struct); + return true; + } + + BufferViewType get_buffer() const { + return m_buffer->get_buffer_view(m_buffer_head); + } /* * Returns a new Version object that is a copy of this one. The new object @@ -69,54 +73,29 @@ public: * provided argument. */ Version *clone(size_t number) { - std::unique_lock<std::mutex> m_buffer_lock; - auto epoch = new Version(number); - epoch->m_buffer = m_buffer; - epoch->m_buffer_head = m_buffer_head; + auto version = new Version(number); + version->m_buffer = m_buffer; + version->m_buffer_head = m_buffer_head; if (m_structure) { - epoch->m_structure = m_structure->copy(); - /* the copy routine returns a structure with 0 references */ - epoch->m_structure->take_reference(); + version->m_structure = std::unique_ptr(m_structure->copy()); } - return epoch; + return version; } - /* - * Check if a merge can be started from this Version. At present, without - * concurrent merging, this simply checks if there is currently a scheduled - * merge based on this Version. If there is, returns false. If there isn't, - * return true and set a flag indicating that there is an active merge. - */ - bool prepare_reconstruction() { - auto old = m_active_merge.load(); - if (old) { - return false; - } - - // FIXME: this needs cleaned up - while (!m_active_merge.compare_exchange_strong(old, true)) { - old = m_active_merge.load(); - if (old) { - return false; - } - } - - return true; + void set_next_buffer_head(size_t new_head) { + m_pending_buffer_head = new_head; } - bool advance_buffer_head(size_t head) { - m_buffer_head = head; + bool advance_buffer_head() { + m_buffer_head = m_pending_buffer_head; return m_buffer->advance_head(m_buffer_head); } private: - Buffer *m_buffer; - std::unique_ptr<Structure> m_structure; - - std::mutex m_buffer_lock; - std::atomic<bool> m_active_merge; + BufferType *m_buffer; + std::unique_ptr<StructureType> m_structure; /* * The number of currently active jobs @@ -124,7 +103,8 @@ private: * epoch. An epoch can only be retired * when this number is 0. */ - size_t m_epoch_number; + size_t m_id; size_t m_buffer_head; + ssize_t m_pending_buffer_head; }; } // namespace de diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index c304f1c..62c27f5 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -51,7 +51,6 @@ public: new_struct->m_levels.push_back(m_levels[i]->clone()); } - new_struct->m_refcnt = 0; return new_struct; } @@ -156,8 +155,7 @@ public: return cnt; } - inline void perform_reconstruction(ReconstructionTask task, - BuffView *bv=nullptr) { + inline void perform_reconstruction(ReconstructionTask task) { /* perform the reconstruction itself */ std::vector<const ShardType *> shards; for (ShardID shid : task.sources) { @@ -172,10 +170,6 @@ public: shards.push_back(m_levels[shid.level_idx]->get_shard(i)); } } - } else if (shid == buffer_shid) { - assert(bv); - ShardType *buffer_shard = new ShardType(std::move(bv)); - shards.push_back(buffer_shard); } else { shards.push_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx)); } diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 9c1e624..a751a29 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -18,10 +18,10 @@ namespace de { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType, DeletePolicy D, SchedulerInterface SchedType> class DEConfiguration { + public: DEConfiguration(std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy) - : m_recon_policy(recon_policy) {} + : m_recon_policy(std::move(recon_policy)) {} - public: std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> m_recon_policy; /* buffer parameters */ diff --git a/tests/de_level_concurrent.cpp b/tests/de_level_concurrent.cpp index a76285b..a43006e 100644 --- a/tests/de_level_concurrent.cpp +++ b/tests/de_level_concurrent.cpp @@ -13,11 +13,13 @@ #include <random> #include <algorithm> +#include "framework/scheduling/SerialScheduler.h" #include "include/testing.h" #include "framework/DynamicExtension.h" #include "shard/ISAMTree.h" #include "query/rangequery.h" #include "framework/reconstruction/LevelingPolicy.h" +#include "framework/util/Configuration.h" #include <check.h> using namespace de; @@ -27,12 +29,10 @@ typedef ISAMTree<R> S; typedef rq::Query<S> Q; typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE; -ReconstructionPolicy<S, Q> *recon = new LevelingPolicy<S, Q>(2, 1000); -ReconstructionPolicy<S, Q> *recon2 = new LevelingPolicy<S, Q>(4, 10000); +typedef de::DEConfiguration<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> CONF; #include "include/concurrent_extension.h" - Suite *unit_testing() { Suite *unit = suite_create("DynamicExtension: Concurrent Leveling Testing"); diff --git a/tests/include/concurrent_extension.h b/tests/include/concurrent_extension.h index 84f816d..fa669d2 100644 --- a/tests/include/concurrent_extension.h +++ b/tests/include/concurrent_extension.h @@ -26,24 +26,41 @@ #include "framework/reconstruction/TieringPolicy.h" #include "testing.h" #include "framework/DynamicExtension.h" -#include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/SerialScheduler.h" +#include "framework/reconstruction/LevelingPolicy.h" #include "shard/ISAMTree.h" #include "query/rangequery.h" #include <check.h> +#include <memory> #include <set> #include <random> -// using namespace de; -// typedef Rec R; -// typedef ISAMTree<R> S; -// typedef rq::Query<S> Q; -// typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; -// ReconstructionPolicy<S,Q> *recon = new TieringPolicy<S, Q>(2, 1000); -// ReconstructionPolicy<S,Q> *recon2 = new TieringPolicy<S, Q>(4, 10000); + + +using namespace de; +typedef Rec R; +typedef ISAMTree<R> S; +typedef rq::Query<S> Q; +typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE; +typedef de::DEConfiguration<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> CONF; + +static CONF create_config(size_t type=1) { + if (type == 1) { + auto recon = std::make_unique<LevelingPolicy<S, Q>>(2, 1000); + + return CONF(std::move(recon)); + } else { + auto recon2 = std::make_unique<LevelingPolicy<S, Q>>(4, 10000); + CONF configuration2 = CONF(std::move(recon2)); + return CONF(std::move(recon2)); + } +} + + START_TEST(t_create) { - auto test_de = new DE(recon, 100, 1000); + auto test_de = new DE(create_config()); ck_assert_ptr_nonnull(test_de); ck_assert_int_eq(test_de->get_record_count(), 0); @@ -56,7 +73,7 @@ END_TEST START_TEST(t_insert) { - auto test_de = new DE(recon, 100, 1000); + auto test_de = new DE(create_config()); uint64_t key = 0; uint32_t val = 0; @@ -77,7 +94,7 @@ END_TEST START_TEST(t_debug_insert) { - auto test_de = new DE(recon, 100, 1000); + auto test_de = new DE(create_config()); uint64_t key = 0; uint32_t val = 0; @@ -96,7 +113,7 @@ END_TEST START_TEST(t_insert_with_mem_merges) { - auto test_de = new DE(recon, 100, 1000); + auto test_de = new DE(create_config()); uint64_t key = 0; uint32_t val = 0; @@ -109,7 +126,7 @@ START_TEST(t_insert_with_mem_merges) ck_assert_int_eq(test_de->get_record_count(), 1000); - test_de->await_next_epoch(); + test_de->await_version(); ck_assert_int_eq(test_de->get_record_count(), 1000); @@ -128,7 +145,7 @@ START_TEST(t_insert_with_mem_merges) } } while (cnt < 100000); - test_de->await_next_epoch(); + test_de->await_version(); ck_assert_int_eq(test_de->get_record_count(), 101000); @@ -139,7 +156,7 @@ END_TEST START_TEST(t_range_query) { - auto test_de = new DE(recon2, 1000, 10000); + auto test_de = new DE(create_config(2)); size_t n = 10000000; std::vector<uint64_t> keys; @@ -162,7 +179,7 @@ START_TEST(t_range_query) } - test_de->await_next_epoch(); + test_de->await_version(); std::sort(keys.begin(), keys.end()); @@ -193,7 +210,7 @@ END_TEST START_TEST(t_tombstone_merging_01) { size_t reccnt = 100000; - auto test_de = new DE(recon, 100, 1000); + auto test_de = new DE(create_config()); auto rng = gsl_rng_alloc(gsl_rng_mt19937); @@ -237,7 +254,7 @@ START_TEST(t_tombstone_merging_01) } } - test_de->await_next_epoch(); + test_de->await_version(); ck_assert(test_de->validate_tombstone_proportion()); @@ -251,7 +268,7 @@ START_TEST(t_static_structure) auto rng = gsl_rng_alloc(gsl_rng_mt19937); size_t reccnt = 100000; - auto test_de = new DE(recon, 100, 1000); + auto test_de = new DE(create_config()); std::set<R> records; std::set<R> to_delete; |