From fd0e99e618319974320ed3fb49535aec501be1fb Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 6 Feb 2025 15:56:33 -0500 Subject: Background compaction stuff --- CMakeLists.txt | 27 +++-- benchmarks/include/standard_benchmarks.h | 12 ++- benchmarks/tail-latency/isam_insert_dist.cpp | 107 +++++++++++++++++++ benchmarks/tail-latency/standard_latency_dist.cpp | 114 +++++++++++++++++++++ include/framework/DynamicExtension.h | 69 +++++++++++-- include/framework/reconstruction/BSMPolicy.h | 19 ++-- .../reconstruction/BackgroundTieringPolicy.h | 88 ++++++++++++++++ .../reconstruction/FixedShardCountPolicy.h | 34 +++--- include/framework/reconstruction/FloodL0Policy.h | 3 +- include/framework/reconstruction/TieringPolicy.h | 15 ++- include/framework/scheduling/Task.h | 1 + include/framework/scheduling/Version.h | 40 ++++++-- include/framework/structure/ExtensionStructure.h | 72 +++++++++++-- include/framework/structure/InternalLevel.h | 76 +++++++++----- include/util/types.h | 8 +- 15 files changed, 587 insertions(+), 98 deletions(-) create mode 100644 benchmarks/tail-latency/isam_insert_dist.cpp create mode 100644 benchmarks/tail-latency/standard_latency_dist.cpp create mode 100644 include/framework/reconstruction/BackgroundTieringPolicy.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 902889f..fafc9bb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,11 +9,11 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(namespace "de") project("Practical Dynamic Extension" VERSION 0.1.0) -set(debug true) +set(debug false) set(tests True) set(bench false) set(vldb_bench false) -set(tail_bench false) +set(tail_bench true) # ALEX doesn't build under C++20 set(build_alex false) @@ -286,15 +286,20 @@ if (vldb_bench) endif() if (tail_bench) - add_executable(config_sweep ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/config_sweep.cpp) - target_link_libraries(config_sweep PUBLIC gsl pthread atomic) - target_include_directories(config_sweep PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) - target_link_options(config_sweep PUBLIC -mcx16) - - add_executable(fixed_shard_count ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/fixed_shard_count.cpp) - target_link_libraries(fixed_shard_count PUBLIC gsl pthread atomic) - target_include_directories(fixed_shard_count PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) - target_link_options(fixed_shard_count PUBLIC -mcx16) + # add_executable(config_sweep ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/config_sweep.cpp) + # target_link_libraries(config_sweep PUBLIC gsl pthread atomic) + # target_include_directories(config_sweep PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) + # target_link_options(config_sweep PUBLIC -mcx16) + + # add_executable(fixed_shard_count ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/fixed_shard_count.cpp) + # target_link_libraries(fixed_shard_count PUBLIC gsl pthread atomic) + # target_include_directories(fixed_shard_count PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) + # target_link_options(fixed_shard_count PUBLIC -mcx16) + + add_executable(standard_latency_dist ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/standard_latency_dist.cpp) + target_link_libraries(standard_latency_dist PUBLIC gsl pthread atomic) + target_include_directories(standard_latency_dist PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) + target_link_options(standard_latency_dist PUBLIC -mcx16) endif() if (bench) diff --git a/benchmarks/include/standard_benchmarks.h b/benchmarks/include/standard_benchmarks.h index 76423ab..2cbe1a8 100644 --- a/benchmarks/include/standard_benchmarks.h +++ b/benchmarks/include/standard_benchmarks.h @@ -14,6 +14,8 @@ #include "framework/DynamicExtension.h" #include "framework/interface/Query.h" +#include "framework/reconstruction/FixedShardCountPolicy.h" +#include "framework/reconstruction/ReconstructionPolicy.h" #include "query/irs.h" #include "psu-util/progress.h" #include "benchmark_types.h" @@ -23,13 +25,14 @@ #include "framework/reconstruction/TieringPolicy.h" #include "framework/reconstruction/BSMPolicy.h" #include "framework/reconstruction/FloodL0Policy.h" +#include "framework/reconstruction/BackgroundTieringPolicy.h" constexpr double delete_proportion = 0.05; static size_t g_deleted_records = 0; static size_t total = 0; template Q> -de::ReconstructionPolicy *get_policy(size_t scale_factor, size_t buffer_size, int policy=0) { +std::unique_ptr> get_policy(size_t scale_factor, size_t buffer_size, int policy=0, size_t reccnt=0) { de::ReconstructionPolicy *recon = nullptr; @@ -41,9 +44,14 @@ de::ReconstructionPolicy *get_policy(size_t scale_factor, size_t buffer_si recon = new de::BSMPolicy(buffer_size); } else if (policy == 3) { recon = new de::FloodL0Policy(buffer_size); + } else if (policy == 4) { + assert(reccnt > 0); + recon = new de::FixedShardCountPolicy(buffer_size, scale_factor, reccnt); + } else if (policy == 5) { + recon = new de::BackgroundTieringPolicy(scale_factor, buffer_size); } - return recon; + return std::unique_ptr>(recon); } template diff --git a/benchmarks/tail-latency/isam_insert_dist.cpp b/benchmarks/tail-latency/isam_insert_dist.cpp new file mode 100644 index 0000000..88d37c5 --- /dev/null +++ b/benchmarks/tail-latency/isam_insert_dist.cpp @@ -0,0 +1,107 @@ +/* + * + */ + +#include "framework/scheduling/FIFOScheduler.h" +#define ENABLE_TIMER +#define TS_TEST + +#include + +#include "framework/DynamicExtension.h" +#include "shard/ISAMTree.h" +#include "query/rangecount.h" +#include "framework/interface/Record.h" +#include "file_util.h" +#include "standard_benchmarks.h" + +#include + +#include "psu-util/timer.h" + + +typedef de::Record Rec; +typedef de::ISAMTreeShard; +typedef de::rc::Query Q; +typedef de::DynamicExtension Ext; +typedef Q::Parameters QP; + +void usage(char *progname) { + fprintf(stderr, "%s reccnt datafile queryfile\n", progname); +} + +int main(int argc, char **argv) { + + if (argc < 4) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + std::string d_fname = std::string(argv[2]); + std::string q_fname = std::string(argv[3]); + + + auto data = read_sosd_file(d_fname, n); + auto queries = read_range_queries(q_fname, .0001); + + std::vector policies = {3}; + std::vector buffers = {8000, 16000, 32000}; + std::vector sfs = {8}; + + for (size_t l=0; l(sfs[k], buffers[j], policies[l]); + auto extension = new Ext(policy, buffers[j]/4, buffers[j]); + + /* warmup structure w/ 10% of records */ + size_t warmup = .1 * n; + for (size_t i=0; iinsert(data[i])) { + usleep(1); + } + } + + extension->await_next_epoch(); + + TIMER_INIT(); + + for (size_t i=warmup; iinsert(data[i])) { + usleep(1); + } + TIMER_STOP(); + + //fprintf(stdout, "%ld\t%ld\t%d\t%ld\n", sfs[k], buffers[j], policies[l], TIMER_RESULT()); + } + + extension->await_next_epoch(); + + /* repeat the queries a bunch of times */ + for (size_t l=0; l<10; l++) { + for (size_t i=0; iquery(std::move(q)); + res.get(); + TIMER_STOP(); + + fprintf(stdout, "%ld\t%ld\t%d\t%ld\n", sfs[k], buffers[j], policies[l], TIMER_RESULT()); + } + } + + + QP p = {0, 10000}; + auto res =extension->query(std::move(p)); + + fprintf(stderr, "%ld\n", res.get()); + extension->await_next_epoch(); + delete extension; + }}} + + + fflush(stderr); +} + diff --git a/benchmarks/tail-latency/standard_latency_dist.cpp b/benchmarks/tail-latency/standard_latency_dist.cpp new file mode 100644 index 0000000..2e800fc --- /dev/null +++ b/benchmarks/tail-latency/standard_latency_dist.cpp @@ -0,0 +1,114 @@ +/* + * + */ + +#include "framework/scheduling/SerialScheduler.h" +#include "framework/util/Configuration.h" +#include "util/types.h" +#define ENABLE_TIMER +#define TS_TEST + +#include + +#include "framework/DynamicExtension.h" +#include "framework/scheduling/FIFOScheduler.h" +#include "shard/TrieSpline.h" +#include "query/rangecount.h" +#include "framework/interface/Record.h" +#include "file_util.h" +#include "standard_benchmarks.h" + +#include "framework/reconstruction/FixedShardCountPolicy.h" + +#include + +#include "psu-util/timer.h" + + +typedef de::Record Rec; +typedef de::TrieSpline Shard; +typedef de::rc::Query Q; +typedef de::DynamicExtension Ext; +typedef Q::Parameters QP; +typedef de::DEConfiguration Conf; + +void usage(char *progname) { + fprintf(stderr, "%s reccnt datafile queryfile\n", progname); +} + +int main(int argc, char **argv) { + + if (argc < 4) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + std::string d_fname = std::string(argv[2]); + std::string q_fname = std::string(argv[3]); + + auto data = read_sosd_file(d_fname, n); + auto queries = read_range_queries(q_fname, .0001); + + std::vector sfs = {8}; //, 4, 8, 16, 32, 64, 128, 256, 512, 1024}; + size_t buffer_size = 8000; + std::vector policies = {5,}; + + for (auto pol: policies) { + for (size_t i=0; i(sfs[i], buffer_size, pol, n); + auto config = Conf(std::move(policy)); + config.recon_enable_maint_on_flush = true; + config.recon_maint_disabled = false; + config.buffer_flush_trigger = 4000; + + auto extension = new Ext(std::move(config)); + + /* warmup structure w/ 10% of records */ + size_t warmup = .1 * n; + for (size_t j=0; jinsert(data[j])) { + usleep(1); + } + } + + extension->await_version(); + + TIMER_INIT(); + + TIMER_START(); + for (size_t j=warmup; jinsert(data[j])) { + usleep(1); + fprintf(stderr, "%ld\n", j); + } + } + TIMER_STOP(); + + size_t insert_tput = (double) (n - warmup) / (double) (TIMER_RESULT()) * 1e9; + + extension->await_version(); + + size_t total = 0; + TIMER_START(); + /* repeat the queries a bunch of times */ + for (size_t l=0; l<10; l++) { + for (size_t j=0; jquery(std::move(q)); + total += res.get(); + } + } + TIMER_STOP(); + + size_t query_lat = (double) TIMER_RESULT() / (10*queries.size()); + + fprintf(stdout, "S\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\n", pol, sfs[i], extension->get_height(), extension->get_shard_count(), extension->get_record_count(), total, insert_tput, query_lat); + extension->print_structure(); + delete extension; + } + } + + fflush(stderr); +} + diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index ef36de3..a48f390 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -89,6 +89,8 @@ public: std::make_shared(INITIAL_VERSION, std::make_unique(), m_buffer.get(), 0)); m_version_counter = INITIAL_VERSION; + assert(m_config.recon_policy); + m_reconstruction_scheduled.store(false); } /** @@ -374,6 +376,14 @@ public: */ void print_scheduler_statistics() const { m_sched->print_statistics(); } + /** + * Writes a schematic view of the currently active structure to + * stdout. Each level is on its own line, and each shard is represented. + */ + void print_structure() { + get_active_version()->get_structure()->print_structure(); + } + private: ConfType m_config; @@ -390,6 +400,8 @@ private: std::condition_variable m_version_advance_cv; std::mutex m_version_advance_mtx; + std::atomic m_reconstruction_scheduled; + std::atomic m_flush_in_progress = false; alignas(64) std::atomic m_scheduling_reconstruction; @@ -441,6 +453,7 @@ private: * this code will be bypassed in that case. */ if (args->priority == ReconstructionPriority::FLUSH) { + // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id()); assert(extension->m_flush_in_progress.load()); /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); @@ -460,19 +473,21 @@ private: if (extension->m_config.recon_maint_disabled) { assert(args->version->get_mutable_structure()); - args->version->get_mutable_structure()->append_l0(std::move(new_shard)); + args->version->get_mutable_structure()->append_l0(std::move(new_shard), args->version->get_id()); } else { assert(!args->version->get_mutable_structure()); auto structure = std::unique_ptr(extension->get_active_version()->get_structure()->copy()); /* add the newly created shard to the structure copy */ - structure->append_l0(std::move(new_shard)); + structure->append_l0(std::move(new_shard), args->version->get_id()); /* set this version's structure to the newly created one */ args->version->set_structure(std::move(structure)); } args->version->advance_buffer_head(new_head); + } else { + // fprintf(stderr, "[I] Running background reconstruction\n"); } /* perform all of the reconstructions */ @@ -487,13 +502,18 @@ private: */ if (args->version->get_id() == INVALID_VERSION) { args->version->set_id(extension->m_version_counter.fetch_add(1)); + // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id()); } /* advance the index to the newly finished version */ - extension->install_new_version(args->version); + extension->install_new_version(args->version, args->initial_version); if (args->priority == ReconstructionPriority::FLUSH) { extension->m_flush_in_progress.store(false); + // fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id()); + } else { + extension->m_reconstruction_scheduled.store(false); + // fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id()); } /* manually delete the argument object */ @@ -593,17 +613,18 @@ private: return new_version; } - void install_new_version(version_ptr new_version) { + void install_new_version(version_ptr new_version, size_t old_active_version_id) { assert(new_version->get_structure()); assert(new_version->get_id() != INVALID_VERSION); + // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); /* wait until our turn to install the new version */ await_version(new_version->get_id() - 1); auto old = get_active_version(); - // FIXME: implement this interface - // new_version->merge_changes_from(old.load().get()); + new_version->merge_changes_from(old.get(), old_active_version_id); + new_version->update_shard_version(new_version->get_id()); /* * Only one version can have a given number, so we are safe to @@ -620,6 +641,8 @@ private: */ auto lk = std::unique_lock(m_version_advance_mtx); m_version_advance_cv.notify_all(); + + // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id()); } StructureType *create_scratch_structure() { @@ -649,6 +672,8 @@ private: return; } + // fprintf(stderr, "[I] Scheduling flush\n"); + /* * for "legacy" policies, without background reconstruction, we need * a valid structure object as part of the version prior to determining @@ -672,6 +697,7 @@ private: args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::FLUSH; + args->initial_version = INVALID_VERSION; /* * NOTE: args is deleted by the reconstruction job, so shouldn't be @@ -689,7 +715,7 @@ private: void schedule_maint_reconstruction(bool take_reconstruction_lock = true) { - if (m_config.recon_maint_disabled) { + if (m_config.recon_maint_disabled || m_reconstruction_scheduled.load()) { return; } @@ -697,21 +723,38 @@ private: begin_reconstruction_scheduling(); } + if (m_reconstruction_scheduled.load()) { + end_reconstruction_scheduling(); + return; + } + + // fprintf(stderr, "[I] Scheduling maintenance\n"); + + m_reconstruction_scheduled.store(true); + // FIXME: memory management issue here? - auto new_version = create_version_maint(std::unique_ptr(m_active_version.load()->get_structure()->copy())); + auto active_version = m_active_version.load(); + auto new_version = create_version_maint(std::unique_ptr(active_version->get_structure()->copy())); auto *args = new ReconstructionArgs(); args->version = new_version; args->tasks = m_config.recon_policy->get_reconstruction_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::MAINT; + args->initial_version = active_version->get_id(); /* * NOTE: args is deleted by the reconstruction job, so shouldn't be * freed here */ - m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, - RECONSTRUCTION); + if (args->tasks.size() > 0) { + m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, + RECONSTRUCTION); + } else { + delete args; + m_reconstruction_scheduled.store(false); + // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n"); + } if (take_reconstruction_lock) { end_reconstruction_scheduling(); @@ -739,6 +782,12 @@ private: } } + + if (rand() % 1000 < 5) { + size_t l0_cnt = get_active_version()->get_structure()->get_level_vector()[0]->get_shard_count(); + usleep(l0_cnt); + } + /* this will fail if the HWM is reached and return 0 */ return m_buffer->append(rec, ts); } diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h index eaa374a..9ddd150 100644 --- a/include/framework/reconstruction/BSMPolicy.h +++ b/include/framework/reconstruction/BSMPolicy.h @@ -25,8 +25,7 @@ public: : m_scale_factor(2), m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(const Version *version, - size_t incoming_reccnt) const override { + get_reconstruction_tasks(const Version *version) const override { ReconstructionVector reconstructions; return reconstructions; } @@ -49,13 +48,21 @@ public: task.target = target_level; task.type = ReconstructionType::Merge; + std::vector source_shards; + size_t reccnt = 0; + + source_shards.push_back({0, all_shards_idx}); + for (level_index i = target_level; i > source_level; i--) { if (i < (level_index)levels.size()) { - task.add_shard({i, all_shards_idx}, levels[i]->get_record_count()); + source_shards.push_back({i-1, all_shards_idx}); + reccnt += levels[i-1]->get_record_count(); } } - reconstructions.add_reconstruction(task); + assert(source_shards.size() > 0); + + reconstructions.add_reconstruction(source_shards, target_level, reccnt, ReconstructionType::Merge); return reconstructions; } @@ -63,8 +70,8 @@ private: level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = invalid_level_idx; - for (level_index i = 0; i < (level_index)levels.size(); i++) { - if (levels[i]->get_record_count() + m_buffer_size <= capacity(i)) { + for (level_index i = 1; i < (level_index)levels.size(); i++) { + if (levels[i]->get_record_count() == 0) { target_level = i; break; } diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h new file mode 100644 index 0000000..4c266fd --- /dev/null +++ b/include/framework/reconstruction/BackgroundTieringPolicy.h @@ -0,0 +1,88 @@ +/* + * include/framework/reconstruction/LevelingPolicy.h + * + * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Dong Xie + * + * Distributed under the Modified BSD License. + * + */ +#pragma once +#include + +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/scheduling/Version.h" +#include "util/types.h" + +namespace de { +template QueryType> +class BackgroundTieringPolicy : public ReconstructionPolicy { + typedef std::vector>> + LevelVector; + +public: + BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size) + : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} + + ReconstructionVector get_reconstruction_tasks( + const Version *version) const override { + ReconstructionVector reconstructions; + + auto levels = version->get_structure()->get_level_vector(); + + if (levels[0]->get_shard_count() < m_scale_factor) { + return reconstructions; + } + + level_index target_level = find_reconstruction_target(levels); + assert(target_level != -1); + level_index source_level = 0; + + if (target_level == invalid_level_idx) { + /* grow */ + target_level = levels.size(); + } + + for (level_index i = target_level; i > source_level; i--) { + size_t target_reccnt = + (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0; + size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt; + + std::vector shards; + for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) { + shards.push_back({i-1, j}); + } + + reconstructions.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact); + } + + return reconstructions; + } + + ReconstructionVector + get_flush_tasks(const Version *version) const override { + ReconstructionVector reconstructions; + + return reconstructions; + } + +private: + level_index find_reconstruction_target(LevelVector &levels) const { + level_index target_level = invalid_level_idx; + + for (level_index i = 0; i < (level_index)levels.size(); i++) { + if (levels[i]->get_shard_count() + 1 <= capacity()) { + target_level = i; + break; + } + } + + return target_level; + } + + inline size_t capacity() const { return m_scale_factor; } + + size_t m_scale_factor; + size_t m_buffer_size; +}; +} // namespace de diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h index 0768daa..a181052 100644 --- a/include/framework/reconstruction/FixedShardCountPolicy.h +++ b/include/framework/reconstruction/FixedShardCountPolicy.h @@ -25,8 +25,7 @@ public: : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {} ReconstructionVector - get_reconstruction_tasks(const Version *version, - size_t incoming_reccnt) const override { + get_reconstruction_tasks(const Version *version) const override { ReconstructionVector reconstructions; return reconstructions; @@ -36,26 +35,25 @@ public: get_flush_tasks(const Version *version) const override { auto levels = version->get_structure()->get_level_vector(); - ReconstructionVector v; - if (levels.size() == 0) { - v.add_reconstruction(ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}); - return v; - } - - ShardID last_shid = {0, (shard_index) (levels[0]->get_shard_count() - 1)}; - - if (levels[0]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) { - v.add_reconstruction(ReconstructionTask{ - {{buffer_shid, last_shid}}, 0, m_buffer_size, ReconstructionType::Merge}); - return v; - } else { - v.add_reconstruction(ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}); + /* if this is the very first flush, there won't be an L1 yet */ + if (levels.size() > 1 && levels[1]->get_shard_count() > 0) { + ShardID last_shid = {1, (shard_index) (levels[1]->get_shard_count() - 1)}; + if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) { + auto task = ReconstructionTask { + {{0, 0}, last_shid}, 1, m_buffer_size,ReconstructionType::Merge + }; + v.add_reconstruction(task); return v; + } } + + auto task = ReconstructionTask { + {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append + }; + v.add_reconstruction(task); + return v; } private: diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h index 94bed70..b4f453b 100644 --- a/include/framework/reconstruction/FloodL0Policy.h +++ b/include/framework/reconstruction/FloodL0Policy.h @@ -24,8 +24,7 @@ public: FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(const Version *version, - size_t incoming_reccnt) const override { + get_reconstruction_tasks(const Version *version) const override { ReconstructionVector reconstructions; return reconstructions; } diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h index dce5c3c..ae215db 100644 --- a/include/framework/reconstruction/TieringPolicy.h +++ b/include/framework/reconstruction/TieringPolicy.h @@ -17,14 +17,15 @@ namespace de { template QueryType> class TieringPolicy : public ReconstructionPolicy { - typedef std::vector>> LevelVector; + typedef std::vector>> + LevelVector; + public: TieringPolicy(size_t scale_factor, size_t buffer_size) : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} - ReconstructionVector - get_reconstruction_tasks(const Version *version, - size_t incoming_reccnt) const override { + ReconstructionVector get_reconstruction_tasks( + const Version *version) const override { ReconstructionVector reconstructions; return reconstructions; } @@ -59,7 +60,7 @@ private: level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = invalid_level_idx; - for (level_index i = 0; i < (level_index) levels.size(); i++) { + for (level_index i = 0; i < (level_index)levels.size(); i++) { if (levels[i]->get_shard_count() + 1 <= capacity()) { target_level = i; break; @@ -69,9 +70,7 @@ private: return target_level; } - inline size_t capacity() const { - return m_scale_factor; - } + inline size_t capacity() const { return m_scale_factor; } size_t m_scale_factor; size_t m_buffer_size; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 1591909..ed40d3d 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -36,6 +36,7 @@ struct ReconstructionArgs { ReconstructionVector tasks; void *extension; ReconstructionPriority priority; + size_t initial_version; }; template Q, typename DE> struct QueryArgs { diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index 3e93202..fa677f2 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -89,16 +89,44 @@ public: return m_buffer->advance_head(new_head); } + void merge_changes_from(Version *old, size_t version_id) { + /* + * for a maint reconstruction, the buffer head may have advanced + * during the reconstruction; we don't need to adjust the buffer + * for maintenance reconstructions, so we can simply "catch" the + * internal head index up to the current version. + */ + if (old->m_buffer_head > m_buffer_head) { + m_buffer_head = old->m_buffer_head; + } + + // FIXME: we should also ensure that we don't clobber anything + // in the event that multiple concurrent reconstructions affect + // the same levels. As it stands, two reconstructions *could* share + // source shards, resulting in some records being lost or duplicated. + // + // For the moment, I'm being careful to avoid this within the + // scheduling policy itself, and only forwarding changes to this + // version. + + /* using INVALID_VERSION disables shard reconcilliation */ + if (version_id == 0) { + return; + } + + /* add any shards newer than version_id to this version */ + auto old_structure = old->get_structure(); + m_structure->merge_structure(old_structure, version_id); + } + + void update_shard_version(size_t version) { + m_structure->update_shard_version(version); + } + private: BufferType *m_buffer; std::unique_ptr m_structure; - /* - * The number of currently active jobs - * (queries/merges) operating on this - * epoch. An epoch can only be retired - * when this number is 0. - */ size_t m_id; size_t m_buffer_head; ssize_t m_pending_buffer_head; diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 60fb6c7..2bf7086 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -27,8 +27,9 @@ class ExtensionStructure { typedef std::vector>> LevelVector; public: - ExtensionStructure() { - m_levels.emplace_back(std::make_shared>(0)); + ExtensionStructure(bool default_level=true) { + if (default_level) + m_levels.emplace_back(std::make_shared>(0)); } ~ExtensionStructure() = default; @@ -49,7 +50,7 @@ public: * need to be forwarded to the appropriate structures manually. */ ExtensionStructure *copy() const { - auto new_struct = new ExtensionStructure(); + auto new_struct = new ExtensionStructure(false); for (size_t i = 0; i < m_levels.size(); i++) { new_struct->m_levels.push_back(m_levels[i]->clone()); } @@ -158,13 +159,17 @@ public: return cnt; } - inline void perform_reconstruction(ReconstructionTask task) { + inline void perform_reconstruction(ReconstructionTask task, size_t version=0) { /* perform the reconstruction itself */ std::vector shards; for (ShardID shid : task.sources) { - assert(shid.level_idx < (level_index) m_levels.size()); + assert(shid.level_idx <= (level_index) m_levels.size()); assert(shid.shard_idx >= -1); + if (shid.level_idx == (level_index) m_levels.size()) { + continue; + } + /* if unspecified, push all shards into the vector */ if (shid.shard_idx == all_shards_idx) { for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); @@ -184,21 +189,27 @@ public: * Remove all of the shards processed by the operation */ for (ShardID shid : task.sources) { - if (shid.shard_idx == all_shards_idx) { + if (shid.level_idx == (level_index) m_levels.size()) { + continue; + } else if (shid.shard_idx == all_shards_idx) { m_levels[shid.level_idx]->truncate(); } else if (shid != buffer_shid) { m_levels[shid.level_idx]->delete_shard(shid.shard_idx); } } + // fprintf(stderr, "Target: %ld\tLevels:%ld\n", task.target, m_levels.size()); + /* * Append the new shard to the target level */ if (task.target < (level_index)m_levels.size()) { - m_levels[task.target]->append(std::shared_ptr(new_shard)); + m_levels[task.target]->append(std::shared_ptr(new_shard), version); + // fprintf(stderr, "append (no growth)\n"); } else { /* grow the structure if needed */ m_levels.push_back(std::make_shared>(task.target)); - m_levels[task.target]->append(std::shared_ptr(new_shard)); + m_levels[task.target]->append(std::shared_ptr(new_shard), version); + // fprintf(stderr, "grow and append\n"); } } @@ -219,8 +230,8 @@ public: return m_levels[0]->get_shard_count(); } - void append_l0(std::shared_ptr shard) { - m_levels[0]->append(shard); + void append_l0(std::shared_ptr shard, size_t version) { + m_levels[0]->append(shard, version); } LevelVector const &get_level_vector() const { return m_levels; } @@ -251,6 +262,47 @@ public: return ts_prop <= (long double) max_delete_prop; } + void print_structure() const { + for (size_t i=0; iget_shard_count(); j++) { + fprintf(stdout, "(%ld: %ld) ", j, m_levels[i]->get_shard(j)->get_record_count()); + } + } else { + fprintf(stdout, "[Empty]"); + } + + fprintf(stdout, "\n"); + } + } + + + void merge_structure(const ExtensionStructure* old_structure, size_t version_id = 0) { + assert(version_id > 0); + + for (size_t i=0; im_levels.size(); i++) { + for (size_t j=0; jm_levels[i]->get_shard_count(); j++) { + if (old_structure->m_levels[i]->get_shard_version(j) > version_id) { + m_levels[i]->append(old_structure->m_levels[i]->get_shard_ptr(j)); + } + } + } + } + + void update_shard_version(size_t version) { + assert(version != 0); + + for (size_t i=0; iget_shard_count(); j++) { + if (m_levels[i]->get_shard_version(j) == 0) { + m_levels[i]->set_shard_version(j, version); + } + } + } + } + private: LevelVector m_levels; }; diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index 5bc891b..37b2b40 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -15,6 +15,7 @@ */ #pragma once +#include #include #include @@ -28,6 +29,7 @@ template QueryType> class InternalLevel { typedef typename ShardType::RECORD RecordType; typedef BufferView BuffView; + typedef std::pair, size_t> shard_ptr; public: InternalLevel(ssize_t level_no) : m_level_no(level_no) {} @@ -47,8 +49,8 @@ public: std::vector shards; for (auto shard : m_shards) { - if (shard) - shards.emplace_back(shard.get()); + if (shard.first) + shards.emplace_back(shard.first.get()); } return new ShardType(shards); @@ -59,10 +61,10 @@ public: std::vector &local_queries, typename QueryType::Parameters *query_parms) const { for (size_t i = 0; i < m_shards.size(); i++) { - if (m_shards[i]) { + if (m_shards[i].first) { auto local_query = - QueryType::local_preproc(m_shards[i].get(), query_parms); - shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].get()}); + QueryType::local_preproc(m_shards[i].first.get(), query_parms); + shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].first.get()}); local_queries.emplace_back(local_query); } } @@ -74,7 +76,7 @@ public: for (int i = m_shards.size() - 1; i >= (ssize_t)shard_stop; i--) { if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec, true); + auto res = m_shards[i].first->point_lookup(rec, true); if (res && res->is_tombstone()) { return true; } @@ -88,8 +90,8 @@ public: return false; for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec); + if (m_shards[i].first) { + auto res = m_shards[i].first->point_lookup(rec); if (res) { res->set_delete(); return true; @@ -105,7 +107,15 @@ public: return nullptr; } - return m_shards[idx].get(); + return m_shards[idx].first.get(); + } + + const size_t get_shard_version(size_t idx) const { + if (idx >= m_shards.size()) { + return 0; + } + + return m_shards[idx].second; } size_t get_shard_count() const { return m_shards.size(); } @@ -113,8 +123,8 @@ public: size_t get_record_count() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { - if (m_shards[i]) { - cnt += m_shards[i]->get_record_count(); + if (m_shards[i].first) { + cnt += m_shards[i].first->get_record_count(); } } @@ -124,8 +134,8 @@ public: size_t get_tombstone_count() const { size_t res = 0; for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i]) { - res += m_shards[i]->get_tombstone_count(); + if (m_shards[i].first) { + res += m_shards[i].first->get_tombstone_count(); } } return res; @@ -134,8 +144,8 @@ public: size_t get_aux_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { - if (m_shards[i]) { - cnt += m_shards[i]->get_aux_memory_usage(); + if (m_shards[i].first) { + cnt += m_shards[i].first->get_aux_memory_usage(); } } @@ -146,7 +156,7 @@ public: size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { - cnt += m_shards[i]->get_memory_usage(); + cnt += m_shards[i].first->get_memory_usage(); } } @@ -158,8 +168,8 @@ public: size_t reccnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { - tscnt += m_shards[i]->get_tombstone_count(); - reccnt += m_shards[i]->get_record_count(); + tscnt += m_shards[i].first->get_tombstone_count(); + reccnt += m_shards[i].first->get_record_count(); } } @@ -169,7 +179,7 @@ public: size_t get_nonempty_shard_count() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { - if (m_shards[i] && m_shards[i]->get_record_count() > 0) { + if (m_shards[i].first && m_shards[i].first->get_record_count() > 0) { cnt += 1; } } @@ -180,7 +190,7 @@ public: std::shared_ptr clone() const { auto new_level = std::make_shared(m_level_no); for (size_t i = 0; i < m_shards.size(); i++) { - new_level->append(m_shards[i]); + new_level->append(m_shards[i].first, m_shards[i].second); } return new_level; @@ -192,21 +202,39 @@ public: m_shards.erase(m_shards.begin() + shard); } - void append(std::shared_ptr shard) { - m_shards.emplace_back(shard); + void append(std::shared_ptr shard, size_t version=0) { + m_shards.push_back({shard, version}); + } + + void append(shard_ptr shard) { + m_shards.push_back(shard); } const ShardType *get_shard(ShardID shid) const { if (shid < m_shards.size()) { - return m_shards[shid].get(); + return m_shards[shid].first.get(); } return nullptr; } + const shard_ptr get_shard_ptr(size_t shid) const { + if (shid < m_shards.size()) { + return m_shards[shid]; + } + + return {nullptr, 0}; + } + + void set_shard_version(size_t idx, size_t version) { + if (idx < m_shards.size()) { + m_shards[idx].second = version; + } + } + private: ssize_t m_level_no; - std::vector> m_shards; + std::vector m_shards; }; } // namespace de diff --git a/include/util/types.h b/include/util/types.h index e67c486..084bf4b 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -111,7 +111,13 @@ public: void add_reconstruction(level_index source, level_index target, size_t reccnt, ReconstructionType type) { - m_tasks.push_back({{{source, all_shards_idx}}, target, reccnt}); + + if (type == ReconstructionType::Merge) { + m_tasks.push_back({{{source, all_shards_idx}, {target, all_shards_idx}}, target, reccnt}); + } else { + m_tasks.push_back({{{source, all_shards_idx}}, target, reccnt}); + } + total_reccnt += reccnt; } -- cgit v1.2.3