diff options
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 69 |
1 files changed, 59 insertions, 10 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index ef36de3..a48f390 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -89,6 +89,8 @@ public: std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0)); m_version_counter = INITIAL_VERSION; + assert(m_config.recon_policy); + m_reconstruction_scheduled.store(false); } /** @@ -374,6 +376,14 @@ public: */ void print_scheduler_statistics() const { m_sched->print_statistics(); } + /** + * Writes a schematic view of the currently active structure to + * stdout. Each level is on its own line, and each shard is represented. + */ + void print_structure() { + get_active_version()->get_structure()->print_structure(); + } + private: ConfType m_config; @@ -390,6 +400,8 @@ private: std::condition_variable m_version_advance_cv; std::mutex m_version_advance_mtx; + std::atomic<bool> m_reconstruction_scheduled; + std::atomic<bool> m_flush_in_progress = false; alignas(64) std::atomic<bool> m_scheduling_reconstruction; @@ -441,6 +453,7 @@ private: * this code will be bypassed in that case. */ if (args->priority == ReconstructionPriority::FLUSH) { + // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id()); assert(extension->m_flush_in_progress.load()); /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); @@ -460,19 +473,21 @@ private: if (extension->m_config.recon_maint_disabled) { assert(args->version->get_mutable_structure()); - args->version->get_mutable_structure()->append_l0(std::move(new_shard)); + args->version->get_mutable_structure()->append_l0(std::move(new_shard), args->version->get_id()); } else { assert(!args->version->get_mutable_structure()); auto structure = std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy()); /* add the newly created shard to the structure copy */ - structure->append_l0(std::move(new_shard)); + structure->append_l0(std::move(new_shard), args->version->get_id()); /* set this version's structure to the newly created one */ args->version->set_structure(std::move(structure)); } args->version->advance_buffer_head(new_head); + } else { + // fprintf(stderr, "[I] Running background reconstruction\n"); } /* perform all of the reconstructions */ @@ -487,13 +502,18 @@ private: */ if (args->version->get_id() == INVALID_VERSION) { args->version->set_id(extension->m_version_counter.fetch_add(1)); + // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id()); } /* advance the index to the newly finished version */ - extension->install_new_version(args->version); + extension->install_new_version(args->version, args->initial_version); if (args->priority == ReconstructionPriority::FLUSH) { extension->m_flush_in_progress.store(false); + // fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id()); + } else { + extension->m_reconstruction_scheduled.store(false); + // fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id()); } /* manually delete the argument object */ @@ -593,17 +613,18 @@ private: return new_version; } - void install_new_version(version_ptr new_version) { + void install_new_version(version_ptr new_version, size_t old_active_version_id) { assert(new_version->get_structure()); assert(new_version->get_id() != INVALID_VERSION); + // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); /* wait until our turn to install the new version */ await_version(new_version->get_id() - 1); auto old = get_active_version(); - // FIXME: implement this interface - // new_version->merge_changes_from(old.load().get()); + new_version->merge_changes_from(old.get(), old_active_version_id); + new_version->update_shard_version(new_version->get_id()); /* * Only one version can have a given number, so we are safe to @@ -620,6 +641,8 @@ private: */ auto lk = std::unique_lock(m_version_advance_mtx); m_version_advance_cv.notify_all(); + + // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id()); } StructureType *create_scratch_structure() { @@ -649,6 +672,8 @@ private: return; } + // fprintf(stderr, "[I] Scheduling flush\n"); + /* * for "legacy" policies, without background reconstruction, we need * a valid structure object as part of the version prior to determining @@ -672,6 +697,7 @@ private: args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::FLUSH; + args->initial_version = INVALID_VERSION; /* * NOTE: args is deleted by the reconstruction job, so shouldn't be @@ -689,7 +715,7 @@ private: void schedule_maint_reconstruction(bool take_reconstruction_lock = true) { - if (m_config.recon_maint_disabled) { + if (m_config.recon_maint_disabled || m_reconstruction_scheduled.load()) { return; } @@ -697,21 +723,38 @@ private: begin_reconstruction_scheduling(); } + if (m_reconstruction_scheduled.load()) { + end_reconstruction_scheduling(); + return; + } + + // fprintf(stderr, "[I] Scheduling maintenance\n"); + + m_reconstruction_scheduled.store(true); + // FIXME: memory management issue here? - auto new_version = create_version_maint(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy())); + auto active_version = m_active_version.load(); + auto new_version = create_version_maint(std::unique_ptr<StructureType>(active_version->get_structure()->copy())); auto *args = new ReconstructionArgs<ShardType, QueryType>(); args->version = new_version; args->tasks = m_config.recon_policy->get_reconstruction_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::MAINT; + args->initial_version = active_version->get_id(); /* * NOTE: args is deleted by the reconstruction job, so shouldn't be * freed here */ - m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, - RECONSTRUCTION); + if (args->tasks.size() > 0) { + m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, + RECONSTRUCTION); + } else { + delete args; + m_reconstruction_scheduled.store(false); + // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n"); + } if (take_reconstruction_lock) { end_reconstruction_scheduling(); @@ -739,6 +782,12 @@ private: } } + + if (rand() % 1000 < 5) { + size_t l0_cnt = get_active_version()->get_structure()->get_level_vector()[0]->get_shard_count(); + usleep(l0_cnt); + } + /* this will fail if the HWM is reached and return 0 */ return m_buffer->append(rec, ts); } |