diff options
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 87 |
1 files changed, 48 insertions, 39 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index a48f390..aa07659 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -17,6 +17,7 @@ #include "framework/interface/Scheduler.h" #include "framework/reconstruction/ReconstructionPolicy.h" #include "framework/scheduling/SerialScheduler.h" +#include "framework/scheduling/LockManager.h" #include "framework/scheduling/Task.h" #include "framework/structure/ExtensionStructure.h" @@ -51,6 +52,7 @@ private: static constexpr size_t QUERY = 1; static constexpr size_t RECONSTRUCTION = 2; + static constexpr size_t FLUSH = 3; typedef std::shared_ptr<VersionType> version_ptr; typedef size_t version_id; @@ -90,7 +92,6 @@ public: m_version_counter = INITIAL_VERSION; assert(m_config.recon_policy); - m_reconstruction_scheduled.store(false); } /** @@ -400,10 +401,10 @@ private: std::condition_variable m_version_advance_cv; std::mutex m_version_advance_mtx; - std::atomic<bool> m_reconstruction_scheduled; - std::atomic<bool> m_flush_in_progress = false; + LockManager m_lock_mngr; + alignas(64) std::atomic<bool> m_scheduling_reconstruction; void enforce_delete_invariant(VersionType *version) { @@ -493,7 +494,9 @@ private: /* perform all of the reconstructions */ StructureType *structure = args->version->get_mutable_structure(); for (size_t i = 0; i < args->tasks.size(); i++) { - structure->perform_reconstruction(args->tasks[i]); + if (structure->perform_reconstruction(args->tasks[i])) { + extension->m_lock_mngr.add_lock(); + } } /* @@ -508,11 +511,19 @@ private: /* advance the index to the newly finished version */ extension->install_new_version(args->version, args->initial_version); + /* maint reconstructions can now safely release their locks */ + if (args->priority == ReconstructionPriority::MAINT) { + for (size_t i=0; i<args->tasks.size(); i++) { + for (auto source : args->tasks[i].sources) { + extension->m_lock_mngr.release_lock(source.level_idx); + } + } + } + if (args->priority == ReconstructionPriority::FLUSH) { extension->m_flush_in_progress.store(false); // fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id()); } else { - extension->m_reconstruction_scheduled.store(false); // fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id()); } @@ -591,6 +602,7 @@ private: */ version_ptr create_version_flush(std::unique_ptr<StructureType> structure) { size_t version_id = m_version_counter.fetch_add(1); + // fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id); auto active_version = get_active_version(); std::shared_ptr<VersionType> new_version = std::make_shared<VersionType>(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head()); @@ -617,7 +629,7 @@ private: assert(new_version->get_structure()); assert(new_version->get_id() != INVALID_VERSION); - // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); + // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); /* wait until our turn to install the new version */ await_version(new_version->get_id() - 1); @@ -669,10 +681,10 @@ private: bool old = m_flush_in_progress.load(); if (old || !m_flush_in_progress.compare_exchange_strong(old, true)) { end_reconstruction_scheduling(); + m_version_advance_cv.notify_all(); return; } - // fprintf(stderr, "[I] Scheduling flush\n"); /* * for "legacy" policies, without background reconstruction, we need @@ -692,6 +704,7 @@ private: auto new_version = create_version_flush(std::move(structure)); + auto *args = new ReconstructionArgs<ShardType, QueryType>(); args->version = new_version; args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get()); @@ -699,12 +712,14 @@ private: args->priority = ReconstructionPriority::FLUSH; args->initial_version = INVALID_VERSION; + // fprintf(stderr, "[I] Scheduling flush (%ld)\n", new_version->get_id()); /* * NOTE: args is deleted by the reconstruction job, so shouldn't be * freed here */ m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, - RECONSTRUCTION); + FLUSH); + // fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", new_version->get_id()); if (m_config.recon_enable_maint_on_flush) { schedule_maint_reconstruction(false); @@ -715,7 +730,7 @@ private: void schedule_maint_reconstruction(bool take_reconstruction_lock = true) { - if (m_config.recon_maint_disabled || m_reconstruction_scheduled.load()) { + if (m_config.recon_maint_disabled) { return; } @@ -723,37 +738,28 @@ private: begin_reconstruction_scheduling(); } - if (m_reconstruction_scheduled.load()) { - end_reconstruction_scheduling(); - return; - } - // fprintf(stderr, "[I] Scheduling maintenance\n"); - m_reconstruction_scheduled.store(true); - - // FIXME: memory management issue here? auto active_version = m_active_version.load(); - auto new_version = create_version_maint(std::unique_ptr<StructureType>(active_version->get_structure()->copy())); + auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr); - auto *args = new ReconstructionArgs<ShardType, QueryType>(); - args->version = new_version; - args->tasks = m_config.recon_policy->get_reconstruction_tasks(args->version.get()); - args->extension = this; - args->priority = ReconstructionPriority::MAINT; - args->initial_version = active_version->get_id(); + // if (reconstructions.size() == 0) { + // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n"); + // } - /* - * NOTE: args is deleted by the reconstruction job, so shouldn't be - * freed here - */ - if (args->tasks.size() > 0) { + for (auto &recon : reconstructions) { + /* + * NOTE: args is deleted by the reconstruction job, so shouldn't be + * freed here + */ + auto *args = new ReconstructionArgs<ShardType, QueryType>(); + args->version = create_version_maint(std::unique_ptr<StructureType>(active_version->get_structure()->copy())); + args->tasks = std::move(recon); + args->extension = this; + args->priority = ReconstructionPriority::MAINT; + args->initial_version = active_version->get_id(); m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, - RECONSTRUCTION); - } else { - delete args; - m_reconstruction_scheduled.store(false); - // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n"); + RECONSTRUCTION); } if (take_reconstruction_lock) { @@ -775,17 +781,20 @@ private: } int internal_append(const RecordType &rec, bool ts) { - if (m_buffer->is_at_low_watermark()) { + size_t max_l0 = (log(get_record_count()) / log(8)) + 1; + size_t current_l0 = get_active_version()->get_structure()->get_level_vector()[0]->get_shard_count(); + + if (m_buffer->is_at_low_watermark() && current_l0 <= max_l0) { auto old = false; if (m_scheduling_reconstruction.compare_exchange_strong(old, true)) { schedule_flush(); } } - - if (rand() % 1000 < 5) { - size_t l0_cnt = get_active_version()->get_structure()->get_level_vector()[0]->get_shard_count(); - usleep(l0_cnt); + if (m_buffer->is_at_high_watermark() && current_l0 > max_l0) { + schedule_maint_reconstruction(true); + // fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0); + return 0; } /* this will fail if the HWM is reached and return 0 */ |