From 6c906c94e1eea6d4356b8c99b93da39029e8d95d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 17 Jan 2025 17:28:50 -0500 Subject: Progress --- include/framework/DynamicExtension.h | 446 ++++++++------------- include/framework/reconstruction/BSMPolicy.h | 12 +- .../reconstruction/FixedShardCountPolicy.h | 25 +- include/framework/reconstruction/FloodL0Policy.h | 11 +- include/framework/reconstruction/LevelingPolicy.h | 14 +- .../reconstruction/ReconstructionPolicy.h | 4 +- include/framework/reconstruction/TieringPolicy.h | 15 +- include/framework/scheduling/Epoch.h | 20 +- include/framework/scheduling/Task.h | 11 +- include/framework/structure/ExtensionStructure.h | 21 +- include/framework/util/Configuration.h | 20 + 11 files changed, 254 insertions(+), 345 deletions(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 5a64243..ea3ef4d 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -11,15 +11,19 @@ #include #include +#include #include #include "framework/interface/Scheduler.h" #include "framework/reconstruction/ReconstructionPolicy.h" #include "framework/scheduling/SerialScheduler.h" +#include "framework/scheduling/Task.h" #include "framework/structure/ExtensionStructure.h" #include "framework/structure/MutableBuffer.h" +#include "framework/util/Configuration.h" + #include "framework/scheduling/Epoch.h" #include "util/types.h" @@ -34,9 +38,10 @@ private: typedef typename ShardType::RECORD RecordType; typedef MutableBuffer Buffer; typedef ExtensionStructure Structure; - typedef Epoch _Epoch; + typedef Version Version; typedef BufferView BufView; typedef ReconstructionPolicy ReconPolicyType; + typedef DEConfiguration ConfType; typedef typename QueryType::Parameters Parameters; typedef typename QueryType::LocalQuery LocalQuery; @@ -47,11 +52,6 @@ private: static constexpr size_t QUERY = 1; static constexpr size_t RECONSTRUCTION = 2; - struct epoch_ptr { - _Epoch *epoch; - size_t refcnt; - }; - public: /** * Create a new Dynamized version of a data structure, supporting @@ -74,42 +74,26 @@ public: * framework's scheduler for use in answering queries and * performing compactions and flushes, etc. */ - DynamicExtension(ReconPolicyType *recon_policy, size_t buffer_low_watermark, - size_t buffer_high_watermark = 0, size_t memory_budget = 0, - size_t thread_cnt = 16) - : m_max_delete_prop(1), m_sched(memory_budget, thread_cnt), - m_buffer(new Buffer(buffer_low_watermark, (buffer_high_watermark == 0) - ? buffer_low_watermark - : buffer_high_watermark)), - m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0), - m_recon_policy(recon_policy) { - - auto vers = new Structure(); - m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); - m_previous_epoch.store({nullptr, 0}); - m_next_epoch.store({nullptr, 0}); + DynamicExtension(ConfType &&config) : m_config(std::move(config)) { + m_buffer = std::make_unique(m_config.buffer_flush_trigger, + m_config.buffer_size); + + m_sched = SchedType(m_config.maximum_memory_usage, m_config.maximum_threads); + m_active_version.load(std::make_shared(INITIAL_VERSION, new Structure(), m_buffer.get(), 0)); } /** * Destructor for DynamicExtension. Will block until the completion of - * any outstanding epoch transition, shut down the scheduler, and free + * any outstanding version transition, shut down the scheduler, and free * all currently allocated shards, buffers, etc., by calling their * destructors. */ ~DynamicExtension() { - - /* let any in-flight epoch transition finish */ - await_next_epoch(); + /* let any in-flight version transitions finish */ + await_newest_version(); /* shutdown the scheduler */ m_sched.shutdown(); - - /* delete all held resources */ - delete m_next_epoch.load().epoch; - delete m_current_epoch.load().epoch; - delete m_previous_epoch.load().epoch; - - delete m_buffer; } /** @@ -164,15 +148,11 @@ public: "Tagging is only supported in single-threaded operation"); auto view = m_buffer->get_buffer_view(); - - auto epoch = get_active_epoch(); - if (epoch->get_mutable_structure()->tagged_delete(rec)) { - end_job(epoch); + auto version = get_active_version(); + if (version->get_mutable_structure()->tagged_delete(rec)) { return 1; } - end_job(epoch); - /* * the buffer will take the longest amount of time, and * probably has the lowest probability of having the record, @@ -208,10 +188,9 @@ public: * @return The number of records within the index */ size_t get_record_count() { - auto epoch = get_active_epoch(); - auto t = epoch->get_buffer().get_record_count() + - epoch->get_structure()->get_record_count(); - end_job(epoch); + auto version = get_active_version(); + auto t = version->get_buffer().get_record_count() + + version->get_structure()->get_record_count(); return t; } @@ -224,10 +203,9 @@ public: * @return The number of tombstone records within the index */ size_t get_tombstone_count() { - auto epoch = get_active_epoch(); - auto t = epoch->get_buffer().get_tombstone_count() + - epoch->get_structure()->get_tombstone_count(); - end_job(epoch); + auto version = get_active_version(); + auto t = version->get_buffer().get_tombstone_count() + + version->get_structure()->get_tombstone_count(); return t; } @@ -241,11 +219,7 @@ public: * @return The number of levels within the index */ size_t get_height() { - auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->get_height(); - end_job(epoch); - - return t; + return get_active_version()->get_structure()->get_height(); } /** @@ -254,11 +228,7 @@ public: * @return The number of non-empty shards within the index */ size_t get_shard_count() { - auto epoch = get_active_epoch(); - auto s = epoch->get_structure()->get_shard_count(); - end_job(epoch); - - return s; + return get_active_version()->get_structure()->get_shard_count(); } /** @@ -272,10 +242,9 @@ public: * ShardType::get_memory_usage) and the buffer by the index. */ size_t get_memory_usage() { - auto epoch = get_active_epoch(); + auto version = get_active_version(); auto t = m_buffer->get_memory_usage() + - epoch->get_structure()->get_memory_usage(); - end_job(epoch); + version->get_structure()->get_memory_usage(); return t; } @@ -289,11 +258,7 @@ public: * (as reported by ShardType::get_aux_memory_usage) by the index. */ size_t get_aux_memory_usage() { - auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->get_aux_memory_usage(); - end_job(epoch); - - return t; + return get_active_version()->get_structure()->get_aux_memory_usage(); } /** @@ -312,19 +277,19 @@ public: ShardType * create_static_structure(bool await_reconstruction_completion = false) { if (await_reconstruction_completion) { - await_next_epoch(); + await_newest_version(); } - auto epoch = get_active_epoch(); - auto vers = epoch->get_structure(); + auto version = get_active_version(); + auto structure = version->get_structure(); std::vector shards; - if (vers->get_level_vector().size() > 0) { - for (int i = vers->get_level_vector().size() - 1; i >= 0; i--) { - if (vers->get_level_vector()[i] && - vers->get_level_vector()[i]->get_record_count() > 0) { + if (structure->get_level_vector().size() > 0) { + for (int i = structure->get_level_vector().size() - 1; i >= 0; i--) { + if (structure->get_level_vector()[i] && + structure->get_level_vector()[i]->get_record_count() > 0) { shards.emplace_back( - vers->get_level_vector()[i]->get_combined_shard()); + structure->get_level_vector()[i]->get_combined_shard()); } } } @@ -336,7 +301,7 @@ public: * head pointer any longer */ { - auto bv = epoch->get_buffer(); + auto bv = version->get_buffer(); if (bv.get_record_count() > 0) { shards.emplace_back(new ShardType(std::move(bv))); } @@ -348,19 +313,28 @@ public: delete shard; } - end_job(epoch); return flattened; } /* - * If the current epoch is *not* the newest one, then wait for - * the newest one to become available. Otherwise, returns immediately. + * Determines the newest pending version at the time of call, and + * blocks until that version becomes active. */ - void await_next_epoch() { - while (m_next_epoch.load().epoch != nullptr) { - std::unique_lock lk(m_epoch_cv_lk); - m_epoch_cv.wait(lk); + void await_newest_version() { + /* + * versions are assigned by fetch and add on the counter, so the + * newest assigned version number will be one less than the value + * of the counter + */ + auto newest_pending_version = m_version_counter.load() - 1; + + /* versions signal on m_version_advance_cv when they activate */ + while (m_active_version.load() < newest_pending_version) { + std::unique_lock lk(m_version_advance_mtx); + m_version_advance_cv.wait(lk); } + + return; } /** @@ -372,11 +346,8 @@ public: * satisfied, and false if it is not. */ bool validate_tombstone_proportion() { - auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->validate_tombstone_proportion( - m_max_delete_prop); - end_job(epoch); - return t; + return get_active_version()->get_structure()->validate_tombstone_proportion( + m_config.maximum_delete_proportion); } /** @@ -386,28 +357,34 @@ public: void print_scheduler_statistics() const { m_sched.print_statistics(); } private: - double m_max_delete_prop; + ConfType m_config; + SchedType m_sched; - Buffer *m_buffer; + std::unique_ptr m_buffer; size_t m_core_cnt; std::atomic m_next_core; - std::atomic m_epoch_cnt; ReconPolicyType const *m_recon_policy; - alignas(64) std::atomic m_reconstruction_scheduled; + /* versioning + concurrency variables */ + std::atomic m_version_counter; + typedef std::atomic> version_ptr; + + version_ptr m_active_version; - std::atomic m_next_epoch; - std::atomic m_current_epoch; - std::atomic m_previous_epoch; + typedef size_t version_id; + const size_t INVALID_VERSION = 0; + const size_t INITIAL_VERSION = 1; - std::condition_variable m_epoch_cv; - std::mutex m_epoch_cv_lk; + std::condition_variable m_version_advance_cv; + std::mutex m_version_advance_mtx; - void enforce_delete_invariant(_Epoch *epoch) { - auto structure = epoch->get_structure(); + alignas(64) std::atomic m_scheduling_reconstruction; + + void enforce_delete_invariant(Version *version) { + auto structure = version->get_structure(); auto compactions = structure->get_compaction_tasks(); while (compactions.size() > 0) { @@ -415,7 +392,7 @@ private: /* schedule a compaction */ ReconstructionArgs *args = new ReconstructionArgs(); - args->epoch = epoch; + args->version = version; args->merges = compactions; args->extension = this; args->compaction = true; @@ -434,137 +411,11 @@ private: } } - _Epoch *get_active_epoch() { - epoch_ptr old, new_ptr; - - do { - /* - * during an epoch transition, a nullptr will installed in the - * current_epoch. At this moment, the "new" current epoch will - * soon be installed, but the "current" current epoch has been - * moved back to m_previous_epoch. - */ - if (m_current_epoch.load().epoch == nullptr) { - old = m_previous_epoch; - new_ptr = {old.epoch, old.refcnt + 1}; - if (old.epoch != nullptr && - m_previous_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } else { - old = m_current_epoch; - new_ptr = {old.epoch, old.refcnt + 1}; - if (old.epoch != nullptr && - m_current_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } - } while (true); - - assert(new_ptr.refcnt > 0); - - return new_ptr.epoch; - } - - void advance_epoch(size_t buffer_head) { - - retire_epoch(m_previous_epoch.load().epoch); - - epoch_ptr tmp = {nullptr, 0}; - epoch_ptr cur; - do { - cur = m_current_epoch; - } while (!m_current_epoch.compare_exchange_strong(cur, tmp)); - - m_previous_epoch.store(cur); - - // FIXME: this may currently block because there isn't any - // query preemption yet. At this point, we'd need to either - // 1) wait for all queries on the old_head to finish - // 2) kill all queries on the old_head - // 3) somehow migrate all queries on the old_head to the new - // version - while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) { - _mm_pause(); - } - - m_current_epoch.store(m_next_epoch); - m_next_epoch.store({nullptr, 0}); - - /* notify any blocking threads that the new epoch is available */ - m_epoch_cv_lk.lock(); - m_epoch_cv.notify_all(); - m_epoch_cv_lk.unlock(); - } - - /* - * Creates a new epoch by copying the currently active one. The new epoch's - * structure will be a shallow copy of the old one's. - */ - _Epoch *create_new_epoch() { - /* - * This epoch access is _not_ protected under the assumption that - * only one reconstruction will be able to trigger at a time. If that - * condition is violated, it is possible that this code will clone a retired - * epoch. - */ - assert(m_next_epoch.load().epoch == nullptr); - auto current_epoch = get_active_epoch(); - - m_epoch_cnt.fetch_add(1); - m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0}); - - end_job(current_epoch); - - return m_next_epoch.load().epoch; - } - - void retire_epoch(_Epoch *epoch) { - /* - * Epochs with currently active jobs cannot - * be retired. By the time retire_epoch is called, - * it is assumed that a new epoch is active, meaning - * that the epoch to be retired should no longer - * accumulate new active jobs. Eventually, this - * number will hit zero and the function will - * proceed. - */ - - if (epoch == nullptr) { - return; - } - - epoch_ptr old, new_ptr; - new_ptr = {nullptr, 0}; - do { - old = m_previous_epoch.load(); - - /* - * If running in single threaded mode, the failure to retire - * an Epoch will result in the thread of execution blocking - * indefinitely. - */ - if constexpr (std::same_as) { - if (old.epoch == epoch) - assert(old.refcnt == 0); - } - - if (old.epoch == epoch && old.refcnt == 0 && - m_previous_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - usleep(1); - - } while (true); - - delete epoch; - } - static void reconstruction(void *arguments) { auto args = (ReconstructionArgs *)arguments; ((DynamicExtension *)args->extension)->SetThreadAffinity(); - Structure *vers = args->epoch->get_mutable_structure(); + Structure *vers = args->version->get_mutable_structure(); ReconstructionTask flush_task; flush_task.type = ReconstructionType::Invalid; @@ -586,18 +437,18 @@ private: * was done so as to make room for the full buffer anyway, so there's * no real benefit to doing this first. */ - auto buffer_view = args->epoch->get_buffer(); + auto buffer_view = args->version->get_buffer(); size_t new_head = buffer_view.get_tail(); vers->perform_flush(flush_task, std::move(buffer_view)); args->result.set_value(true); - ((DynamicExtension *)args->extension)->advance_epoch(new_head); + ((DynamicExtension *)args->extension)->advance_version(new_head); } else { args->result.set_value(true); } ((DynamicExtension *)args->extension) - ->m_reconstruction_scheduled.store(false); + ->m_scheduling_reconstruction.store(false); delete args; } @@ -605,10 +456,10 @@ private: static void async_query(void *arguments) { auto *args = (QueryArgs *)arguments; - auto epoch = args->extension->get_active_epoch(); + auto version = args->extension->get_active_version(); - auto buffer = epoch->get_buffer(); - auto vers = epoch->get_structure(); + auto buffer = version->get_buffer(); + auto vers = version->get_structure(); auto *parms = &(args->query_parms); /* create initial buffer query */ @@ -653,9 +504,6 @@ private: /* return the output vector to caller via the future */ args->result_set.set_value(std::move(output)); - /* officially end the query job, releasing the pin on the epoch */ - args->extension->end_job(epoch); - /* clean up memory allocated for temporary query objects */ delete buffer_query; for (size_t i = 0; i < local_queries.size(); i++) { @@ -665,22 +513,89 @@ private: delete args; } - void schedule_reconstruction() { - auto epoch = create_new_epoch(); + version_ptr get_active_version() { return m_active_version.load(); } + + /* + * Create a new version with an assigned version number, but without + * an assigned copy of the structure. Intended for use in flushing, + * where the structure will be copied from the currently active version + * at the time it is activated, but the version number must be claimed + * early to minimize activation blocking. + */ + version_ptr create_version() { + size_t version_id = m_version_counter.fetch_add(1); + auto active_version = get_active_version(); + version_ptr new_version = std::make_shared(version_id, nullptr, &m_buffer, 0); + + return new_version; + } + + /* + * Create a new version without an assigned version number, but with + * a copy of the extension structure. This is for use with background + * reconstructions, where the underlying structure is manipulated, but + * no version number is claimed until the version is activated, to + * prevent blocking buffer flushes. + */ + version_ptr create_version(Structure *structure) { + version_ptr new_version = std::make_shared(INVALID_VERSION, structure, &m_buffer, 0); + + return new_version; + } + + void install_new_version(version_ptr new_version) { + assert(new_version->get_structure()); + assert(new_version->get_version_number() != INVALID_VERSION); + + auto old = get_active_version(); + assert(new_version->get_version_number() > old->get_version_number()); + + /* wait until our turn to install the new version */ + auto lk = std::unique_lock(m_version_advance_mtx); + while (new_version.load()->get_verison_number() != old.load()->get_version_number() + 1) { + m_version_advance_cv.wait(lk); + old = get_active_version(); + } + + // FIXME: implement this interface + // new_version->merge_changes_from(old.load().get()); + + /* + * Only one version can have a given number, so we are safe to + * directly assign here--nobody else is going to change it out from + * under us. We're also protected under the mutex. + */ + m_active_version.store(new_version); + m_version_advance_cv.notify_all(); + } + + Structure *create_scratch_structure() { + return get_active_version()->get_structure()->copy(); + } + + void schedule_flush() { + auto new_version = create_version(); - ReconstructionArgs *args = - new ReconstructionArgs(); - args->epoch = epoch; - args->tasks = m_recon_policy->get_reconstruction_tasks( - epoch, m_buffer->get_high_watermark()); + auto *args = new ReconstructionArgs(); + args->version.load(new_version); + args->tasks = m_recon_policy->get_flush_tasks(args->version.get()); args->extension = this; - args->tasks.add_reconstruction(m_recon_policy->get_flush_task(epoch)); + args->priority = ReconstructionPriority::FLUSH; /* - * NOTE: args is deleted by the reconstruction job, so shouldn't be freed - * here + * NOTE: args is deleted by the reconstruction job, so shouldn't be + * freed here */ - m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); + m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION); + + if (m_config.recon_enable_maint_on_flush) { + schedule_maint_reconstruction(); + } + } + + + void schedule_maint_reconstruction() { + return; } std::future schedule_query(Parameters &&query_parms) { @@ -697,9 +612,8 @@ private: int internal_append(const RecordType &rec, bool ts) { if (m_buffer->is_at_low_watermark()) { auto old = false; - - if (m_reconstruction_scheduled.compare_exchange_strong(old, true)) { - schedule_reconstruction(); + if (m_scheduling_reconstruction.compare_exchange_strong(old, true)) { + schedule_flush(); } } @@ -738,45 +652,5 @@ private: #else void SetThreadAffinity() {} #endif - - void end_job(_Epoch *epoch) { - epoch_ptr old, new_ptr; - - do { - if (m_previous_epoch.load().epoch == epoch) { - old = m_previous_epoch; - /* - * This could happen if we get into the system during a - * transition. In this case, we can just back out and retry - */ - if (old.epoch == nullptr) { - continue; - } - - assert(old.refcnt > 0); - - new_ptr = {old.epoch, old.refcnt - 1}; - if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } else { - old = m_current_epoch; - /* - * This could happen if we get into the system during a - * transition. In this case, we can just back out and retry - */ - if (old.epoch == nullptr) { - continue; - } - - assert(old.refcnt > 0); - - new_ptr = {old.epoch, old.refcnt - 1}; - if (m_current_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } - } while (true); - } }; } // namespace de diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h index ab8c6e4..c42b928 100644 --- a/include/framework/reconstruction/BSMPolicy.h +++ b/include/framework/reconstruction/BSMPolicy.h @@ -25,10 +25,10 @@ public: : m_scale_factor(2), m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(const Epoch *epoch, + get_reconstruction_tasks(const Version *version, size_t incoming_reccnt) const override { ReconstructionVector reconstructions; - auto levels = epoch->get_structure()->get_level_vector(); + auto levels = version->get_structure()->get_level_vector(); level_index target_level = find_reconstruction_target(levels); assert(target_level != -1); @@ -53,10 +53,10 @@ public: return reconstructions; } - ReconstructionTask - get_flush_task(const Epoch *epoch) const override { - return ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge}; + ReconstructionVector + get_flush_tasks(const Version *version) const override { + ReconstructionVector v; + v.add_reconstruction(ReconstructionTask {{buffer_shid}, 0, m_buffer_size, ReconstructionType::Merge}); } private: diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h index ec8e4e6..2a3c977 100644 --- a/include/framework/reconstruction/FixedShardCountPolicy.h +++ b/include/framework/reconstruction/FixedShardCountPolicy.h @@ -25,31 +25,36 @@ public: : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {} ReconstructionVector - get_reconstruction_tasks(const Epoch *epoch, + get_reconstruction_tasks(const Version *version, size_t incoming_reccnt) const override { ReconstructionVector reconstructions; return reconstructions; } - ReconstructionTask - get_flush_task(const Epoch *epoch) const override { + ReconstructionVector + get_flush_tasks(const Version *version) const override { + + auto levels = version->get_structure()->get_level_vector(); - auto levels = epoch->get_structure()->get_level_vector(); + ReconstructionVector v; if (levels.size() == 0) { - return ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}; + v.add_reconstruction(ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}); + return v; } ShardID last_shid = {0, (shard_index) (levels[0]->get_shard_count() - 1)}; if (levels[0]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) { - return ReconstructionTask{ - {{buffer_shid, last_shid}}, 0, m_buffer_size, ReconstructionType::Merge}; + v.add_reconstruction(ReconstructionTask{ + {{buffer_shid, last_shid}}, 0, m_buffer_size, ReconstructionType::Merge}); + return v; } else { - return ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}; + v.add_reconstruction(ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}); + return v; } } diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h index da4c297..8304d8a 100644 --- a/include/framework/reconstruction/FloodL0Policy.h +++ b/include/framework/reconstruction/FloodL0Policy.h @@ -24,7 +24,7 @@ public: FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(const Epoch *epoch, + get_reconstruction_tasks(const Version *version, size_t incoming_reccnt) const override { ReconstructionVector reconstructions; @@ -32,10 +32,11 @@ public: } - ReconstructionTask - get_flush_task(const Epoch *epoch) const override { - return ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}; + ReconstructionVector + get_flush_tasks(const Version *version) const override { + ReconstructionVector v; + v.add_reconstruction(ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}); } private: diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h index add28ba..176492e 100644 --- a/include/framework/reconstruction/LevelingPolicy.h +++ b/include/framework/reconstruction/LevelingPolicy.h @@ -25,10 +25,10 @@ public: : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(const Epoch *epoch, + get_reconstruction_tasks(const Version *version, size_t incoming_reccnt) const override { ReconstructionVector reconstructions; - auto levels = epoch->get_structure()->get_level_vector(); + auto levels = version->get_structure()->get_level_vector(); level_index target_level = find_reconstruction_target(levels); assert(target_level != -1); @@ -51,10 +51,12 @@ public: return reconstructions; } - ReconstructionTask - get_flush_task(const Epoch *epoch) const override { - return ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge}; + ReconstructionVector + get_flush_tasks(const Version *version) const override { + ReconstructionVector v; + v.add_reconstruction(ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge}); + return v; } private: diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h index aa213df..2c737de 100644 --- a/include/framework/reconstruction/ReconstructionPolicy.h +++ b/include/framework/reconstruction/ReconstructionPolicy.h @@ -23,8 +23,8 @@ class ReconstructionPolicy { public: ReconstructionPolicy() {} - virtual ReconstructionVector get_reconstruction_tasks(const Epoch *epoch, + virtual ReconstructionVector get_reconstruction_tasks(const Version *version, size_t incoming_reccnt) const = 0; - virtual ReconstructionTask get_flush_task(const Epoch *epoch) const = 0; + virtual ReconstructionVector get_flush_tasks(const Version *version) const = 0; }; } diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h index 1443309..63be5fe 100644 --- a/include/framework/reconstruction/TieringPolicy.h +++ b/include/framework/reconstruction/TieringPolicy.h @@ -23,10 +23,10 @@ public: : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(const Epoch *epoch, + get_reconstruction_tasks(const Version *version, size_t incoming_reccnt) const override { ReconstructionVector reconstructions; - auto levels = epoch->get_structure()->get_level_vector(); + auto levels = version->get_structure()->get_level_vector(); level_index target_level = find_reconstruction_target(levels); assert(target_level != -1); @@ -49,10 +49,13 @@ public: return reconstructions; } - ReconstructionTask - get_flush_task(const Epoch *epoch) const override { - return ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}; + ReconstructionVector + get_flush_tasks(const Version *version) const override { + ReconstructionVector v; + + v.add_reconstruction(ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}); + return v; } private: diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 95c64ea..7583727 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -17,7 +17,7 @@ namespace de { template QueryType> -class Epoch { +class Version { private: typedef typename ShardType::RECORD RecordType; typedef MutableBuffer Buffer; @@ -25,17 +25,17 @@ private: typedef BufferView BufView; public: - Epoch(size_t number = 0) + Version(size_t number = 0) : m_buffer(nullptr), m_structure(nullptr), m_active_merge(false), m_epoch_number(number), m_buffer_head(0) {} - Epoch(size_t number, Structure *structure, Buffer *buff, size_t head) + Version(size_t number, Structure *structure, Buffer *buff, size_t head) : m_buffer(buff), m_structure(structure), m_active_merge(false), m_epoch_number(number), m_buffer_head(head) { structure->take_reference(); } - ~Epoch() { + ~Version() { if (m_structure) { m_structure->release_reference(); } @@ -49,10 +49,10 @@ public: * Epochs are *not* copyable or movable. Only one can exist, and all users * of it work with pointers */ - Epoch(const Epoch &) = delete; - Epoch(Epoch &&) = delete; - Epoch &operator=(const Epoch &) = delete; - Epoch &operator=(Epoch &&) = delete; + Version(const Version &) = delete; + Version(Version &&) = delete; + Version &operator=(const Version &) = delete; + Version &operator=(Version &&) = delete; size_t get_epoch_number() const { return m_epoch_number; } @@ -68,9 +68,9 @@ public: * the same one. The epoch number of the new epoch will be set to the * provided argument. */ - Epoch *clone(size_t number) { + Version *clone(size_t number) { std::unique_lock m_buffer_lock; - auto epoch = new Epoch(number); + auto epoch = new Version(number); epoch->m_buffer = m_buffer; epoch->m_buffer_head = m_buffer_head; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 7242bef..1ab35d2 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -23,14 +23,19 @@ namespace de { +enum class ReconstructionPriority { + FLUSH = 0, + CMPCT = 1, + MAINT = 2 +}; + template QueryType> struct ReconstructionArgs { typedef typename ShardType::RECORD RecordType; - Epoch *epoch; + std::atomic>> version; ReconstructionVector tasks; - std::promise result; - bool compaction; void *extension; + ReconstructionPriority priority; }; template Q, typename DE> struct QueryArgs { diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 078c4a9..da91509 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -164,17 +164,18 @@ public: assert(shid.level_idx < (level_index) m_levels.size()); assert(shid.shard_idx >= -1); - if (shid == buffer_shid) { - assert(bv); - ShardType *buffer_shard = new ShardType(std::move(*bv)); - shards.push_back(buffer_shard); - } else if (shid.shard_idx == all_shards_idx) { - /* if unspecified, push all shards into the vector */ - for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) { + /* if unspecified, push all shards into the vector */ + if (shid.shard_idx == all_shards_idx) { + for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); + i++) { if (m_levels[shid.level_idx]->get_shard(i)) { shards.push_back(m_levels[shid.level_idx]->get_shard(i)); } } + } else if (shid == buffer_shid) { + assert(bv); + ShardType *buffer_shard = new ShardType(std::move(buffer)); + shards.push_back(buffer_shard); } else { shards.push_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx)); } @@ -186,11 +187,9 @@ public: * Remove all of the shards processed by the operation */ for (ShardID shid : task.sources) { - if (shid == buffer_shid) { - continue; - } else if (shid.shard_idx == all_shards_idx) { + if (shid.shard_idx == all_shards_idx) { m_levels[shid.level_idx]->truncate(); - } else { + } else if (shid != buffer_shid) { m_levels[shid.level_idx]->delete_shard(shid.shard_idx); } } diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 0477095..9c1e624 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -23,6 +23,26 @@ class DEConfiguration { public: std::unique_ptr> m_recon_policy; + + /* buffer parameters */ + size_t buffer_count = 1; + size_t buffer_size = 8000; + size_t buffer_flush_trigger = buffer_size / 4; + + /* reconstruction triggers */ + bool recon_enable_seek_trigger = false; + bool recon_enable_maint_on_flush = true; + bool recon_enable_delete_cmpct = false; + + size_t recon_l0_capacity = 0; /* 0 for unbounded */ + double maximum_delete_proportion = 1; + + /* resource management */ + size_t maximum_threads = 16; + size_t minimum_recon_threads = 1; + size_t minimum_query_threads = 4; + size_t maximum_memory_usage = 0; /* o for unbounded */ + }; } // namespace de -- cgit v1.2.3