diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-02-11 17:32:10 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-02-11 17:32:10 -0500 |
| commit | 85afe4ef04f327862460570fb0aa4c30afcf7cc7 (patch) | |
| tree | aba55db313b752df4ac073db117900e0128c22fb | |
| parent | c04efb2640421be7a24f851c08e290c89b7b46f2 (diff) | |
| download | dynamic-extension-85afe4ef04f327862460570fb0aa4c30afcf7cc7.tar.gz | |
Progress: began adding parallel merging and locking of levels
| -rw-r--r-- | CMakeLists.txt | 2 | ||||
| -rw-r--r-- | benchmarks/tail-latency/standard_latency_dist.cpp | 1 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 87 | ||||
| -rw-r--r-- | include/framework/reconstruction/BSMPolicy.h | 7 | ||||
| -rw-r--r-- | include/framework/reconstruction/BackgroundTieringPolicy.h | 33 | ||||
| -rw-r--r-- | include/framework/reconstruction/FixedShardCountPolicy.h | 8 | ||||
| -rw-r--r-- | include/framework/reconstruction/FloodL0Policy.h | 7 | ||||
| -rw-r--r-- | include/framework/reconstruction/LevelingPolicy.h | 7 | ||||
| -rw-r--r-- | include/framework/reconstruction/ReconstructionPolicy.h | 3 | ||||
| -rw-r--r-- | include/framework/reconstruction/TieringPolicy.h | 7 | ||||
| -rw-r--r-- | include/framework/scheduling/FIFOScheduler.h | 20 | ||||
| -rw-r--r-- | include/framework/scheduling/LockManager.h | 52 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 9 | ||||
| -rw-r--r-- | include/framework/scheduling/statistics.h | 4 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 19 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 1 |
16 files changed, 180 insertions, 87 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index f0d8a86..106b91a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,7 +9,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(namespace "de") project("Practical Dynamic Extension" VERSION 0.1.0) -set(debug false) +set(debug true) set(tests True) set(bench false) set(vldb_bench false) diff --git a/benchmarks/tail-latency/standard_latency_dist.cpp b/benchmarks/tail-latency/standard_latency_dist.cpp index 2e800fc..7cce48c 100644 --- a/benchmarks/tail-latency/standard_latency_dist.cpp +++ b/benchmarks/tail-latency/standard_latency_dist.cpp @@ -80,7 +80,6 @@ int main(int argc, char **argv) { for (size_t j=warmup; j<data.size(); j++) { while (!extension->insert(data[j])) { usleep(1); - fprintf(stderr, "%ld\n", j); } } TIMER_STOP(); diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index a48f390..aa07659 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -17,6 +17,7 @@ #include "framework/interface/Scheduler.h" #include "framework/reconstruction/ReconstructionPolicy.h" #include "framework/scheduling/SerialScheduler.h" +#include "framework/scheduling/LockManager.h" #include "framework/scheduling/Task.h" #include "framework/structure/ExtensionStructure.h" @@ -51,6 +52,7 @@ private: static constexpr size_t QUERY = 1; static constexpr size_t RECONSTRUCTION = 2; + static constexpr size_t FLUSH = 3; typedef std::shared_ptr<VersionType> version_ptr; typedef size_t version_id; @@ -90,7 +92,6 @@ public: m_version_counter = INITIAL_VERSION; assert(m_config.recon_policy); - m_reconstruction_scheduled.store(false); } /** @@ -400,10 +401,10 @@ private: std::condition_variable m_version_advance_cv; std::mutex m_version_advance_mtx; - std::atomic<bool> m_reconstruction_scheduled; - std::atomic<bool> m_flush_in_progress = false; + LockManager m_lock_mngr; + alignas(64) std::atomic<bool> m_scheduling_reconstruction; void enforce_delete_invariant(VersionType *version) { @@ -493,7 +494,9 @@ private: /* perform all of the reconstructions */ StructureType *structure = args->version->get_mutable_structure(); for (size_t i = 0; i < args->tasks.size(); i++) { - structure->perform_reconstruction(args->tasks[i]); + if (structure->perform_reconstruction(args->tasks[i])) { + extension->m_lock_mngr.add_lock(); + } } /* @@ -508,11 +511,19 @@ private: /* advance the index to the newly finished version */ extension->install_new_version(args->version, args->initial_version); + /* maint reconstructions can now safely release their locks */ + if (args->priority == ReconstructionPriority::MAINT) { + for (size_t i=0; i<args->tasks.size(); i++) { + for (auto source : args->tasks[i].sources) { + extension->m_lock_mngr.release_lock(source.level_idx); + } + } + } + if (args->priority == ReconstructionPriority::FLUSH) { extension->m_flush_in_progress.store(false); // fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id()); } else { - extension->m_reconstruction_scheduled.store(false); // fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id()); } @@ -591,6 +602,7 @@ private: */ version_ptr create_version_flush(std::unique_ptr<StructureType> structure) { size_t version_id = m_version_counter.fetch_add(1); + // fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id); auto active_version = get_active_version(); std::shared_ptr<VersionType> new_version = std::make_shared<VersionType>(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head()); @@ -617,7 +629,7 @@ private: assert(new_version->get_structure()); assert(new_version->get_id() != INVALID_VERSION); - // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); + // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); /* wait until our turn to install the new version */ await_version(new_version->get_id() - 1); @@ -669,10 +681,10 @@ private: bool old = m_flush_in_progress.load(); if (old || !m_flush_in_progress.compare_exchange_strong(old, true)) { end_reconstruction_scheduling(); + m_version_advance_cv.notify_all(); return; } - // fprintf(stderr, "[I] Scheduling flush\n"); /* * for "legacy" policies, without background reconstruction, we need @@ -692,6 +704,7 @@ private: auto new_version = create_version_flush(std::move(structure)); + auto *args = new ReconstructionArgs<ShardType, QueryType>(); args->version = new_version; args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get()); @@ -699,12 +712,14 @@ private: args->priority = ReconstructionPriority::FLUSH; args->initial_version = INVALID_VERSION; + // fprintf(stderr, "[I] Scheduling flush (%ld)\n", new_version->get_id()); /* * NOTE: args is deleted by the reconstruction job, so shouldn't be * freed here */ m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, - RECONSTRUCTION); + FLUSH); + // fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", new_version->get_id()); if (m_config.recon_enable_maint_on_flush) { schedule_maint_reconstruction(false); @@ -715,7 +730,7 @@ private: void schedule_maint_reconstruction(bool take_reconstruction_lock = true) { - if (m_config.recon_maint_disabled || m_reconstruction_scheduled.load()) { + if (m_config.recon_maint_disabled) { return; } @@ -723,37 +738,28 @@ private: begin_reconstruction_scheduling(); } - if (m_reconstruction_scheduled.load()) { - end_reconstruction_scheduling(); - return; - } - // fprintf(stderr, "[I] Scheduling maintenance\n"); - m_reconstruction_scheduled.store(true); - - // FIXME: memory management issue here? auto active_version = m_active_version.load(); - auto new_version = create_version_maint(std::unique_ptr<StructureType>(active_version->get_structure()->copy())); + auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr); - auto *args = new ReconstructionArgs<ShardType, QueryType>(); - args->version = new_version; - args->tasks = m_config.recon_policy->get_reconstruction_tasks(args->version.get()); - args->extension = this; - args->priority = ReconstructionPriority::MAINT; - args->initial_version = active_version->get_id(); + // if (reconstructions.size() == 0) { + // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n"); + // } - /* - * NOTE: args is deleted by the reconstruction job, so shouldn't be - * freed here - */ - if (args->tasks.size() > 0) { + for (auto &recon : reconstructions) { + /* + * NOTE: args is deleted by the reconstruction job, so shouldn't be + * freed here + */ + auto *args = new ReconstructionArgs<ShardType, QueryType>(); + args->version = create_version_maint(std::unique_ptr<StructureType>(active_version->get_structure()->copy())); + args->tasks = std::move(recon); + args->extension = this; + args->priority = ReconstructionPriority::MAINT; + args->initial_version = active_version->get_id(); m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, - RECONSTRUCTION); - } else { - delete args; - m_reconstruction_scheduled.store(false); - // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n"); + RECONSTRUCTION); } if (take_reconstruction_lock) { @@ -775,17 +781,20 @@ private: } int internal_append(const RecordType &rec, bool ts) { - if (m_buffer->is_at_low_watermark()) { + size_t max_l0 = (log(get_record_count()) / log(8)) + 1; + size_t current_l0 = get_active_version()->get_structure()->get_level_vector()[0]->get_shard_count(); + + if (m_buffer->is_at_low_watermark() && current_l0 <= max_l0) { auto old = false; if (m_scheduling_reconstruction.compare_exchange_strong(old, true)) { schedule_flush(); } } - - if (rand() % 1000 < 5) { - size_t l0_cnt = get_active_version()->get_structure()->get_level_vector()[0]->get_shard_count(); - usleep(l0_cnt); + if (m_buffer->is_at_high_watermark() && current_l0 > max_l0) { + schedule_maint_reconstruction(true); + // fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0); + return 0; } /* this will fail if the HWM is reached and return 0 */ diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h index 9ddd150..6d55a12 100644 --- a/include/framework/reconstruction/BSMPolicy.h +++ b/include/framework/reconstruction/BSMPolicy.h @@ -24,10 +24,9 @@ public: BSMPolicy(size_t buffer_size) : m_scale_factor(2), m_buffer_size(buffer_size) {} - ReconstructionVector - get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override { - ReconstructionVector reconstructions; - return reconstructions; + std::vector<ReconstructionVector> + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + return {}; } ReconstructionVector diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h index 4c266fd..5a82695 100644 --- a/include/framework/reconstruction/BackgroundTieringPolicy.h +++ b/include/framework/reconstruction/BackgroundTieringPolicy.h @@ -24,14 +24,14 @@ public: BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size) : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} - ReconstructionVector get_reconstruction_tasks( - const Version<ShardType, QueryType> *version) const override { - ReconstructionVector reconstructions; + std::vector<ReconstructionVector> get_reconstruction_tasks( + const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + std::vector<ReconstructionVector> reconstructions; auto levels = version->get_structure()->get_level_vector(); - if (levels[0]->get_shard_count() < m_scale_factor) { - return reconstructions; + if (levels[0]->get_shard_count() == 0) { + return {}; } level_index target_level = find_reconstruction_target(levels); @@ -44,16 +44,19 @@ public: } for (level_index i = target_level; i > source_level; i--) { - size_t target_reccnt = - (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0; - size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt; - - std::vector<ShardID> shards; - for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) { - shards.push_back({i-1, j}); + if (lock_mngr.take_lock(i-1)) { + ReconstructionVector recon; + size_t target_reccnt = + (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0; + size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt; + std::vector<ShardID> shards; + for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) { + shards.push_back({i-1, j}); + } + + recon.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact); + reconstructions.push_back(recon); } - - reconstructions.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact); } return reconstructions; @@ -70,7 +73,7 @@ private: level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = invalid_level_idx; - for (level_index i = 0; i < (level_index)levels.size(); i++) { + for (level_index i = 1; i < (level_index)levels.size(); i++) { if (levels[i]->get_shard_count() + 1 <= capacity()) { target_level = i; break; diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h index a181052..cc8dce4 100644 --- a/include/framework/reconstruction/FixedShardCountPolicy.h +++ b/include/framework/reconstruction/FixedShardCountPolicy.h @@ -24,11 +24,9 @@ public: FixedShardCountPolicy(size_t buffer_size, size_t shard_count, size_t max_record_count) : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {} - ReconstructionVector - get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override { - ReconstructionVector reconstructions; - return reconstructions; - + std::vector<ReconstructionVector> + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + return {}; } ReconstructionVector diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h index b4f453b..c0d29fe 100644 --- a/include/framework/reconstruction/FloodL0Policy.h +++ b/include/framework/reconstruction/FloodL0Policy.h @@ -23,10 +23,9 @@ class FloodL0Policy : public ReconstructionPolicy<ShardType, QueryType> { public: FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {} - ReconstructionVector - get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override { - ReconstructionVector reconstructions; - return reconstructions; + std::vector<ReconstructionVector> + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + return {}; } ReconstructionVector diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h index 3a0b73e..f0feb53 100644 --- a/include/framework/reconstruction/LevelingPolicy.h +++ b/include/framework/reconstruction/LevelingPolicy.h @@ -24,10 +24,9 @@ public: LevelingPolicy(size_t scale_factor, size_t buffer_size) : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} - ReconstructionVector - get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override { - ReconstructionVector reconstructions; - return reconstructions; + std::vector<ReconstructionVector> + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + return {}; } ReconstructionVector diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h index 6f99b32..41a2092 100644 --- a/include/framework/reconstruction/ReconstructionPolicy.h +++ b/include/framework/reconstruction/ReconstructionPolicy.h @@ -15,6 +15,7 @@ #include "util/types.h" #include "framework/structure/ExtensionStructure.h" #include "framework/scheduling/Version.h" +#include "framework/scheduling/LockManager.h" namespace de { template<ShardInterface ShardType, QueryInterface<ShardType> QueryType> @@ -23,7 +24,7 @@ class ReconstructionPolicy { public: ReconstructionPolicy() {} - virtual ReconstructionVector get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const = 0; + virtual std::vector<ReconstructionVector> get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const = 0; virtual ReconstructionVector get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0; }; } diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h index aa94f7a..d8769f7 100644 --- a/include/framework/reconstruction/TieringPolicy.h +++ b/include/framework/reconstruction/TieringPolicy.h @@ -24,10 +24,9 @@ public: TieringPolicy(size_t scale_factor, size_t buffer_size) : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} - ReconstructionVector get_reconstruction_tasks( - const Version<ShardType, QueryType> *version) const override { - ReconstructionVector reconstructions; - return reconstructions; + std::vector<ReconstructionVector> get_reconstruction_tasks( + const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + return {}; } ReconstructionVector diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 4c1db8d..8a4cd8d 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -51,13 +51,20 @@ public: m_sched_thrd.join(); m_sched_wakeup_thrd.join(); + m_flush_thread.join(); } void schedule_job(std::function<void(void *)> job, size_t size, void *args, size_t type = 0) { - std::unique_lock<std::mutex> lk(m_cv_lock); + size_t ts = m_counter.fetch_add(1); + if (type == 3) { + do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock)); + return; + } + + std::unique_lock<std::mutex> lk(m_cv_lock); m_stats.job_queued(ts, type, size); m_task_queue.push(Task(size, ts, job, args, type, &m_stats)); @@ -84,6 +91,9 @@ private: std::condition_variable m_cv; std::mutex m_queue_lock; + std::mutex m_flush_lock; + std::thread m_flush_thread; + std::thread m_sched_thrd; std::thread m_sched_wakeup_thrd; ctpl::thread_pool m_thrd_pool; @@ -121,6 +131,14 @@ private: } } while (!m_shutdown.load()); } + + void do_flush(Task task) { + m_flush_lock.lock(); + if (m_flush_thread.joinable()) { + m_flush_thread.join(); + } + m_flush_thread = std::thread(task, 0); + } }; } // namespace de diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h new file mode 100644 index 0000000..91ed778 --- /dev/null +++ b/include/framework/scheduling/LockManager.h @@ -0,0 +1,52 @@ +/* + * + */ + +#pragma once +#include <deque> +#include <atomic> + +namespace de { +class LockManager { +public: + LockManager(size_t levels=1) { + for (size_t i=0; i < levels; i++) { + m_lks.emplace_back(false); + } + } + + ~LockManager() = default; + + void add_lock() { + m_lks.emplace_back(false); + } + + void release_lock(size_t idx) { + if (idx < m_lks.size()) { + m_lks[idx].store(false); + } + } + + bool is_locked(size_t idx) { + if (idx < m_lks.size()) { + return m_lks[idx].load(); + } + + return false; + } + + bool take_lock(size_t idx) { + if (idx < m_lks.size()) { + bool old = m_lks[idx].load(); + if (!old) { + return m_lks[idx].compare_exchange_strong(old, true); + } + } + + return false; + } + +private: + std::deque<std::atomic<bool>> m_lks; +}; +} diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index ed40d3d..2d68f56 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -49,9 +49,9 @@ typedef std::function<void(void *)> Job; struct Task { Task(size_t size, size_t ts, Job job, void *args, size_t type = 0, - SchedulerStatistics *stats = nullptr) + SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr) : m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type), - m_stats(stats) {} + m_stats(stats), m_lk(lk) {} Job m_job; size_t m_size; @@ -59,6 +59,7 @@ struct Task { void *m_args; size_t m_type; SchedulerStatistics *m_stats; + std::mutex *m_lk; friend bool operator<(const Task &self, const Task &other) { return self.m_timestamp < other.m_timestamp; @@ -87,6 +88,10 @@ struct Task { .count(); m_stats->log_time_data(time, m_type); } + + if (m_lk) { + m_lk->unlock(); + } } }; diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h index 48c186f..a6d66ab 100644 --- a/include/framework/scheduling/statistics.h +++ b/include/framework/scheduling/statistics.h @@ -52,7 +52,7 @@ public: /* FIXME: This is just a temporary approach */ void log_time_data(size_t length, size_t type) { - assert(type == 1 || type == 2); + assert(type == 1 || type == 2 || type == 3); if (type == 1) { m_type_1_cnt.fetch_add(1); @@ -61,7 +61,7 @@ public: if (length > m_type_1_largest_time) { m_type_1_largest_time.store(length); } - } else { + } else if (type == 2) { m_type_2_cnt.fetch_add(1); m_type_2_total_time.fetch_add(length); diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 2bf7086..fa713af 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -159,7 +159,12 @@ public: return cnt; } - inline void perform_reconstruction(ReconstructionTask task, size_t version=0) { + /* + * Perform the reconstruction described by task. If the resulting + * reconstruction grows the structure (i.e., adds a level), returns + * true. Otherwise, returns false. + */ + inline bool perform_reconstruction(ReconstructionTask task, size_t version=0) { /* perform the reconstruction itself */ std::vector<const ShardType *> shards; for (ShardID shid : task.sources) { @@ -205,10 +210,12 @@ public: */ if (task.target < (level_index)m_levels.size()) { m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard), version); + return false; // fprintf(stderr, "append (no growth)\n"); } else { /* grow the structure if needed */ m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(task.target)); m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard), version); + return true; // fprintf(stderr, "grow and append\n"); } } @@ -283,9 +290,13 @@ public: assert(version_id > 0); for (size_t i=0; i<old_structure->m_levels.size(); i++) { - for (size_t j=0; j<old_structure->m_levels[i]->get_shard_count(); j++) { - if (old_structure->m_levels[i]->get_shard_version(j) > version_id) { - m_levels[i]->append(old_structure->m_levels[i]->get_shard_ptr(j)); + if (m_levels.size() <= i) { + m_levels.push_back(old_structure->m_levels[i]); + } else { + for (size_t j=0; j<old_structure->m_levels[i]->get_shard_count(); j++) { + if (old_structure->m_levels[i]->get_shard_version(j) > version_id) { + m_levels[i]->append(old_structure->m_levels[i]->get_shard_ptr(j)); + } } } } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 0eae73d..0197ecd 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -104,6 +104,7 @@ public: bool is_full() const { return get_record_count() >= m_hwm; } bool is_at_low_watermark() const { return get_record_count() >= m_lwm; } + bool is_at_high_watermark() const { return get_record_count() >= m_hwm; } size_t get_tombstone_count() const { return m_tscnt.load(); } |