diff options
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 69 | ||||
| -rw-r--r-- | include/framework/reconstruction/BSMPolicy.h | 19 | ||||
| -rw-r--r-- | include/framework/reconstruction/BackgroundTieringPolicy.h | 88 | ||||
| -rw-r--r-- | include/framework/reconstruction/FixedShardCountPolicy.h | 34 | ||||
| -rw-r--r-- | include/framework/reconstruction/FloodL0Policy.h | 3 | ||||
| -rw-r--r-- | include/framework/reconstruction/TieringPolicy.h | 15 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 1 | ||||
| -rw-r--r-- | include/framework/scheduling/Version.h | 40 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 72 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 76 |
10 files changed, 333 insertions, 84 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index ef36de3..a48f390 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -89,6 +89,8 @@ public: std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0)); m_version_counter = INITIAL_VERSION; + assert(m_config.recon_policy); + m_reconstruction_scheduled.store(false); } /** @@ -374,6 +376,14 @@ public: */ void print_scheduler_statistics() const { m_sched->print_statistics(); } + /** + * Writes a schematic view of the currently active structure to + * stdout. Each level is on its own line, and each shard is represented. + */ + void print_structure() { + get_active_version()->get_structure()->print_structure(); + } + private: ConfType m_config; @@ -390,6 +400,8 @@ private: std::condition_variable m_version_advance_cv; std::mutex m_version_advance_mtx; + std::atomic<bool> m_reconstruction_scheduled; + std::atomic<bool> m_flush_in_progress = false; alignas(64) std::atomic<bool> m_scheduling_reconstruction; @@ -441,6 +453,7 @@ private: * this code will be bypassed in that case. */ if (args->priority == ReconstructionPriority::FLUSH) { + // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id()); assert(extension->m_flush_in_progress.load()); /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); @@ -460,19 +473,21 @@ private: if (extension->m_config.recon_maint_disabled) { assert(args->version->get_mutable_structure()); - args->version->get_mutable_structure()->append_l0(std::move(new_shard)); + args->version->get_mutable_structure()->append_l0(std::move(new_shard), args->version->get_id()); } else { assert(!args->version->get_mutable_structure()); auto structure = std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy()); /* add the newly created shard to the structure copy */ - structure->append_l0(std::move(new_shard)); + structure->append_l0(std::move(new_shard), args->version->get_id()); /* set this version's structure to the newly created one */ args->version->set_structure(std::move(structure)); } args->version->advance_buffer_head(new_head); + } else { + // fprintf(stderr, "[I] Running background reconstruction\n"); } /* perform all of the reconstructions */ @@ -487,13 +502,18 @@ private: */ if (args->version->get_id() == INVALID_VERSION) { args->version->set_id(extension->m_version_counter.fetch_add(1)); + // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id()); } /* advance the index to the newly finished version */ - extension->install_new_version(args->version); + extension->install_new_version(args->version, args->initial_version); if (args->priority == ReconstructionPriority::FLUSH) { extension->m_flush_in_progress.store(false); + // fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id()); + } else { + extension->m_reconstruction_scheduled.store(false); + // fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id()); } /* manually delete the argument object */ @@ -593,17 +613,18 @@ private: return new_version; } - void install_new_version(version_ptr new_version) { + void install_new_version(version_ptr new_version, size_t old_active_version_id) { assert(new_version->get_structure()); assert(new_version->get_id() != INVALID_VERSION); + // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); /* wait until our turn to install the new version */ await_version(new_version->get_id() - 1); auto old = get_active_version(); - // FIXME: implement this interface - // new_version->merge_changes_from(old.load().get()); + new_version->merge_changes_from(old.get(), old_active_version_id); + new_version->update_shard_version(new_version->get_id()); /* * Only one version can have a given number, so we are safe to @@ -620,6 +641,8 @@ private: */ auto lk = std::unique_lock(m_version_advance_mtx); m_version_advance_cv.notify_all(); + + // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id()); } StructureType *create_scratch_structure() { @@ -649,6 +672,8 @@ private: return; } + // fprintf(stderr, "[I] Scheduling flush\n"); + /* * for "legacy" policies, without background reconstruction, we need * a valid structure object as part of the version prior to determining @@ -672,6 +697,7 @@ private: args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::FLUSH; + args->initial_version = INVALID_VERSION; /* * NOTE: args is deleted by the reconstruction job, so shouldn't be @@ -689,7 +715,7 @@ private: void schedule_maint_reconstruction(bool take_reconstruction_lock = true) { - if (m_config.recon_maint_disabled) { + if (m_config.recon_maint_disabled || m_reconstruction_scheduled.load()) { return; } @@ -697,21 +723,38 @@ private: begin_reconstruction_scheduling(); } + if (m_reconstruction_scheduled.load()) { + end_reconstruction_scheduling(); + return; + } + + // fprintf(stderr, "[I] Scheduling maintenance\n"); + + m_reconstruction_scheduled.store(true); + // FIXME: memory management issue here? - auto new_version = create_version_maint(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy())); + auto active_version = m_active_version.load(); + auto new_version = create_version_maint(std::unique_ptr<StructureType>(active_version->get_structure()->copy())); auto *args = new ReconstructionArgs<ShardType, QueryType>(); args->version = new_version; args->tasks = m_config.recon_policy->get_reconstruction_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::MAINT; + args->initial_version = active_version->get_id(); /* * NOTE: args is deleted by the reconstruction job, so shouldn't be * freed here */ - m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, - RECONSTRUCTION); + if (args->tasks.size() > 0) { + m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, + RECONSTRUCTION); + } else { + delete args; + m_reconstruction_scheduled.store(false); + // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n"); + } if (take_reconstruction_lock) { end_reconstruction_scheduling(); @@ -739,6 +782,12 @@ private: } } + + if (rand() % 1000 < 5) { + size_t l0_cnt = get_active_version()->get_structure()->get_level_vector()[0]->get_shard_count(); + usleep(l0_cnt); + } + /* this will fail if the HWM is reached and return 0 */ return m_buffer->append(rec, ts); } diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h index eaa374a..9ddd150 100644 --- a/include/framework/reconstruction/BSMPolicy.h +++ b/include/framework/reconstruction/BSMPolicy.h @@ -25,8 +25,7 @@ public: : m_scale_factor(2), m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(const Version<ShardType, QueryType> *version, - size_t incoming_reccnt) const override { + get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override { ReconstructionVector reconstructions; return reconstructions; } @@ -49,13 +48,21 @@ public: task.target = target_level; task.type = ReconstructionType::Merge; + std::vector<ShardID> source_shards; + size_t reccnt = 0; + + source_shards.push_back({0, all_shards_idx}); + for (level_index i = target_level; i > source_level; i--) { if (i < (level_index)levels.size()) { - task.add_shard({i, all_shards_idx}, levels[i]->get_record_count()); + source_shards.push_back({i-1, all_shards_idx}); + reccnt += levels[i-1]->get_record_count(); } } - reconstructions.add_reconstruction(task); + assert(source_shards.size() > 0); + + reconstructions.add_reconstruction(source_shards, target_level, reccnt, ReconstructionType::Merge); return reconstructions; } @@ -63,8 +70,8 @@ private: level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = invalid_level_idx; - for (level_index i = 0; i < (level_index)levels.size(); i++) { - if (levels[i]->get_record_count() + m_buffer_size <= capacity(i)) { + for (level_index i = 1; i < (level_index)levels.size(); i++) { + if (levels[i]->get_record_count() == 0) { target_level = i; break; } diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h new file mode 100644 index 0000000..4c266fd --- /dev/null +++ b/include/framework/reconstruction/BackgroundTieringPolicy.h @@ -0,0 +1,88 @@ +/* + * include/framework/reconstruction/LevelingPolicy.h + * + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once +#include <cmath> + +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/scheduling/Version.h" +#include "util/types.h" + +namespace de { +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> +class BackgroundTieringPolicy : public ReconstructionPolicy<ShardType, QueryType> { + typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> + LevelVector; + +public: + BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size) + : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} + + ReconstructionVector get_reconstruction_tasks( + const Version<ShardType, QueryType> *version) const override { + ReconstructionVector reconstructions; + + auto levels = version->get_structure()->get_level_vector(); + + if (levels[0]->get_shard_count() < m_scale_factor) { + return reconstructions; + } + + level_index target_level = find_reconstruction_target(levels); + assert(target_level != -1); + level_index source_level = 0; + + if (target_level == invalid_level_idx) { + /* grow */ + target_level = levels.size(); + } + + for (level_index i = target_level; i > source_level; i--) { + size_t target_reccnt = + (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0; + size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt; + + std::vector<ShardID> shards; + for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) { + shards.push_back({i-1, j}); + } + + reconstructions.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact); + } + + return reconstructions; + } + + ReconstructionVector + get_flush_tasks(const Version<ShardType, QueryType> *version) const override { + ReconstructionVector reconstructions; + + return reconstructions; + } + +private: + level_index find_reconstruction_target(LevelVector &levels) const { + level_index target_level = invalid_level_idx; + + for (level_index i = 0; i < (level_index)levels.size(); i++) { + if (levels[i]->get_shard_count() + 1 <= capacity()) { + target_level = i; + break; + } + } + + return target_level; + } + + inline size_t capacity() const { return m_scale_factor; } + + size_t m_scale_factor; + size_t m_buffer_size; +}; +} // namespace de diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h index 0768daa..a181052 100644 --- a/include/framework/reconstruction/FixedShardCountPolicy.h +++ b/include/framework/reconstruction/FixedShardCountPolicy.h @@ -25,8 +25,7 @@ public: : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {} ReconstructionVector - get_reconstruction_tasks(const Version<ShardType, QueryType> *version, - size_t incoming_reccnt) const override { + get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override { ReconstructionVector reconstructions; return reconstructions; @@ -36,26 +35,25 @@ public: get_flush_tasks(const Version<ShardType, QueryType> *version) const override { auto levels = version->get_structure()->get_level_vector(); - ReconstructionVector v; - if (levels.size() == 0) { - v.add_reconstruction(ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}); - return v; - } - - ShardID last_shid = {0, (shard_index) (levels[0]->get_shard_count() - 1)}; - - if (levels[0]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) { - v.add_reconstruction(ReconstructionTask{ - {{buffer_shid, last_shid}}, 0, m_buffer_size, ReconstructionType::Merge}); - return v; - } else { - v.add_reconstruction(ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}); + /* if this is the very first flush, there won't be an L1 yet */ + if (levels.size() > 1 && levels[1]->get_shard_count() > 0) { + ShardID last_shid = {1, (shard_index) (levels[1]->get_shard_count() - 1)}; + if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) { + auto task = ReconstructionTask { + {{0, 0}, last_shid}, 1, m_buffer_size,ReconstructionType::Merge + }; + v.add_reconstruction(task); return v; + } } + + auto task = ReconstructionTask { + {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append + }; + v.add_reconstruction(task); + return v; } private: diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h index 94bed70..b4f453b 100644 --- a/include/framework/reconstruction/FloodL0Policy.h +++ b/include/framework/reconstruction/FloodL0Policy.h @@ -24,8 +24,7 @@ public: FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(const Version<ShardType, QueryType> *version, - size_t incoming_reccnt) const override { + get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override { ReconstructionVector reconstructions; return reconstructions; } diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h index dce5c3c..ae215db 100644 --- a/include/framework/reconstruction/TieringPolicy.h +++ b/include/framework/reconstruction/TieringPolicy.h @@ -17,14 +17,15 @@ namespace de { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> class TieringPolicy : public ReconstructionPolicy<ShardType, QueryType> { - typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector; + typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> + LevelVector; + public: TieringPolicy(size_t scale_factor, size_t buffer_size) : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} - ReconstructionVector - get_reconstruction_tasks(const Version<ShardType, QueryType> *version, - size_t incoming_reccnt) const override { + ReconstructionVector get_reconstruction_tasks( + const Version<ShardType, QueryType> *version) const override { ReconstructionVector reconstructions; return reconstructions; } @@ -59,7 +60,7 @@ private: level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = invalid_level_idx; - for (level_index i = 0; i < (level_index) levels.size(); i++) { + for (level_index i = 0; i < (level_index)levels.size(); i++) { if (levels[i]->get_shard_count() + 1 <= capacity()) { target_level = i; break; @@ -69,9 +70,7 @@ private: return target_level; } - inline size_t capacity() const { - return m_scale_factor; - } + inline size_t capacity() const { return m_scale_factor; } size_t m_scale_factor; size_t m_buffer_size; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 1591909..ed40d3d 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -36,6 +36,7 @@ struct ReconstructionArgs { ReconstructionVector tasks; void *extension; ReconstructionPriority priority; + size_t initial_version; }; template <ShardInterface S, QueryInterface<S> Q, typename DE> struct QueryArgs { diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index 3e93202..fa677f2 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -89,16 +89,44 @@ public: return m_buffer->advance_head(new_head); } + void merge_changes_from(Version *old, size_t version_id) { + /* + * for a maint reconstruction, the buffer head may have advanced + * during the reconstruction; we don't need to adjust the buffer + * for maintenance reconstructions, so we can simply "catch" the + * internal head index up to the current version. + */ + if (old->m_buffer_head > m_buffer_head) { + m_buffer_head = old->m_buffer_head; + } + + // FIXME: we should also ensure that we don't clobber anything + // in the event that multiple concurrent reconstructions affect + // the same levels. As it stands, two reconstructions *could* share + // source shards, resulting in some records being lost or duplicated. + // + // For the moment, I'm being careful to avoid this within the + // scheduling policy itself, and only forwarding changes to this + // version. + + /* using INVALID_VERSION disables shard reconcilliation */ + if (version_id == 0) { + return; + } + + /* add any shards newer than version_id to this version */ + auto old_structure = old->get_structure(); + m_structure->merge_structure(old_structure, version_id); + } + + void update_shard_version(size_t version) { + m_structure->update_shard_version(version); + } + private: BufferType *m_buffer; std::unique_ptr<StructureType> m_structure; - /* - * The number of currently active jobs - * (queries/merges) operating on this - * epoch. An epoch can only be retired - * when this number is 0. - */ size_t m_id; size_t m_buffer_head; ssize_t m_pending_buffer_head; diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 60fb6c7..2bf7086 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -27,8 +27,9 @@ class ExtensionStructure { typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector; public: - ExtensionStructure() { - m_levels.emplace_back(std::make_shared<InternalLevel<ShardType, QueryType>>(0)); + ExtensionStructure(bool default_level=true) { + if (default_level) + m_levels.emplace_back(std::make_shared<InternalLevel<ShardType, QueryType>>(0)); } ~ExtensionStructure() = default; @@ -49,7 +50,7 @@ public: * need to be forwarded to the appropriate structures manually. */ ExtensionStructure<ShardType, QueryType> *copy() const { - auto new_struct = new ExtensionStructure<ShardType, QueryType>(); + auto new_struct = new ExtensionStructure<ShardType, QueryType>(false); for (size_t i = 0; i < m_levels.size(); i++) { new_struct->m_levels.push_back(m_levels[i]->clone()); } @@ -158,13 +159,17 @@ public: return cnt; } - inline void perform_reconstruction(ReconstructionTask task) { + inline void perform_reconstruction(ReconstructionTask task, size_t version=0) { /* perform the reconstruction itself */ std::vector<const ShardType *> shards; for (ShardID shid : task.sources) { - assert(shid.level_idx < (level_index) m_levels.size()); + assert(shid.level_idx <= (level_index) m_levels.size()); assert(shid.shard_idx >= -1); + if (shid.level_idx == (level_index) m_levels.size()) { + continue; + } + /* if unspecified, push all shards into the vector */ if (shid.shard_idx == all_shards_idx) { for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); @@ -184,21 +189,27 @@ public: * Remove all of the shards processed by the operation */ for (ShardID shid : task.sources) { - if (shid.shard_idx == all_shards_idx) { + if (shid.level_idx == (level_index) m_levels.size()) { + continue; + } else if (shid.shard_idx == all_shards_idx) { m_levels[shid.level_idx]->truncate(); } else if (shid != buffer_shid) { m_levels[shid.level_idx]->delete_shard(shid.shard_idx); } } + // fprintf(stderr, "Target: %ld\tLevels:%ld\n", task.target, m_levels.size()); + /* * Append the new shard to the target level */ if (task.target < (level_index)m_levels.size()) { - m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard)); + m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard), version); + // fprintf(stderr, "append (no growth)\n"); } else { /* grow the structure if needed */ m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(task.target)); - m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard)); + m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard), version); + // fprintf(stderr, "grow and append\n"); } } @@ -219,8 +230,8 @@ public: return m_levels[0]->get_shard_count(); } - void append_l0(std::shared_ptr<ShardType> shard) { - m_levels[0]->append(shard); + void append_l0(std::shared_ptr<ShardType> shard, size_t version) { + m_levels[0]->append(shard, version); } LevelVector const &get_level_vector() const { return m_levels; } @@ -251,6 +262,47 @@ public: return ts_prop <= (long double) max_delete_prop; } + void print_structure() const { + for (size_t i=0; i<m_levels.size(); i++) { + fprintf(stdout, "[%ld]:\t", i); + + if (m_levels[i]) { + for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) { + fprintf(stdout, "(%ld: %ld) ", j, m_levels[i]->get_shard(j)->get_record_count()); + } + } else { + fprintf(stdout, "[Empty]"); + } + + fprintf(stdout, "\n"); + } + } + + + void merge_structure(const ExtensionStructure* old_structure, size_t version_id = 0) { + assert(version_id > 0); + + for (size_t i=0; i<old_structure->m_levels.size(); i++) { + for (size_t j=0; j<old_structure->m_levels[i]->get_shard_count(); j++) { + if (old_structure->m_levels[i]->get_shard_version(j) > version_id) { + m_levels[i]->append(old_structure->m_levels[i]->get_shard_ptr(j)); + } + } + } + } + + void update_shard_version(size_t version) { + assert(version != 0); + + for (size_t i=0; i<m_levels.size(); i++) { + for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) { + if (m_levels[i]->get_shard_version(j) == 0) { + m_levels[i]->set_shard_version(j, version); + } + } + } + } + private: LevelVector m_levels; }; diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index 5bc891b..37b2b40 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -15,6 +15,7 @@ */ #pragma once +#include <future> #include <memory> #include <vector> @@ -28,6 +29,7 @@ template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> class InternalLevel { typedef typename ShardType::RECORD RecordType; typedef BufferView<RecordType> BuffView; + typedef std::pair<std::shared_ptr<ShardType>, size_t> shard_ptr; public: InternalLevel(ssize_t level_no) : m_level_no(level_no) {} @@ -47,8 +49,8 @@ public: std::vector<const ShardType *> shards; for (auto shard : m_shards) { - if (shard) - shards.emplace_back(shard.get()); + if (shard.first) + shards.emplace_back(shard.first.get()); } return new ShardType(shards); @@ -59,10 +61,10 @@ public: std::vector<typename QueryType::LocalQuery *> &local_queries, typename QueryType::Parameters *query_parms) const { for (size_t i = 0; i < m_shards.size(); i++) { - if (m_shards[i]) { + if (m_shards[i].first) { auto local_query = - QueryType::local_preproc(m_shards[i].get(), query_parms); - shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].get()}); + QueryType::local_preproc(m_shards[i].first.get(), query_parms); + shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].first.get()}); local_queries.emplace_back(local_query); } } @@ -74,7 +76,7 @@ public: for (int i = m_shards.size() - 1; i >= (ssize_t)shard_stop; i--) { if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec, true); + auto res = m_shards[i].first->point_lookup(rec, true); if (res && res->is_tombstone()) { return true; } @@ -88,8 +90,8 @@ public: return false; for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec); + if (m_shards[i].first) { + auto res = m_shards[i].first->point_lookup(rec); if (res) { res->set_delete(); return true; @@ -105,7 +107,15 @@ public: return nullptr; } - return m_shards[idx].get(); + return m_shards[idx].first.get(); + } + + const size_t get_shard_version(size_t idx) const { + if (idx >= m_shards.size()) { + return 0; + } + + return m_shards[idx].second; } size_t get_shard_count() const { return m_shards.size(); } @@ -113,8 +123,8 @@ public: size_t get_record_count() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { - if (m_shards[i]) { - cnt += m_shards[i]->get_record_count(); + if (m_shards[i].first) { + cnt += m_shards[i].first->get_record_count(); } } @@ -124,8 +134,8 @@ public: size_t get_tombstone_count() const { size_t res = 0; for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i]) { - res += m_shards[i]->get_tombstone_count(); + if (m_shards[i].first) { + res += m_shards[i].first->get_tombstone_count(); } } return res; @@ -134,8 +144,8 @@ public: size_t get_aux_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { - if (m_shards[i]) { - cnt += m_shards[i]->get_aux_memory_usage(); + if (m_shards[i].first) { + cnt += m_shards[i].first->get_aux_memory_usage(); } } @@ -146,7 +156,7 @@ public: size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { - cnt += m_shards[i]->get_memory_usage(); + cnt += m_shards[i].first->get_memory_usage(); } } @@ -158,8 +168,8 @@ public: size_t reccnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { - tscnt += m_shards[i]->get_tombstone_count(); - reccnt += m_shards[i]->get_record_count(); + tscnt += m_shards[i].first->get_tombstone_count(); + reccnt += m_shards[i].first->get_record_count(); } } @@ -169,7 +179,7 @@ public: size_t get_nonempty_shard_count() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { - if (m_shards[i] && m_shards[i]->get_record_count() > 0) { + if (m_shards[i].first && m_shards[i].first->get_record_count() > 0) { cnt += 1; } } @@ -180,7 +190,7 @@ public: std::shared_ptr<InternalLevel> clone() const { auto new_level = std::make_shared<InternalLevel>(m_level_no); for (size_t i = 0; i < m_shards.size(); i++) { - new_level->append(m_shards[i]); + new_level->append(m_shards[i].first, m_shards[i].second); } return new_level; @@ -192,21 +202,39 @@ public: m_shards.erase(m_shards.begin() + shard); } - void append(std::shared_ptr<ShardType> shard) { - m_shards.emplace_back(shard); + void append(std::shared_ptr<ShardType> shard, size_t version=0) { + m_shards.push_back({shard, version}); + } + + void append(shard_ptr shard) { + m_shards.push_back(shard); } const ShardType *get_shard(ShardID shid) const { if (shid < m_shards.size()) { - return m_shards[shid].get(); + return m_shards[shid].first.get(); } return nullptr; } + const shard_ptr get_shard_ptr(size_t shid) const { + if (shid < m_shards.size()) { + return m_shards[shid]; + } + + return {nullptr, 0}; + } + + void set_shard_version(size_t idx, size_t version) { + if (idx < m_shards.size()) { + m_shards[idx].second = version; + } + } + private: ssize_t m_level_no; - std::vector<std::shared_ptr<ShardType>> m_shards; + std::vector<shard_ptr> m_shards; }; } // namespace de |