From 125e243cad99aa29444759e15053fd148ff0e3ba Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 11 Feb 2025 17:53:44 -0500 Subject: more updates --- benchmarks/tail-latency/standard_latency_dist.cpp | 1 + include/framework/DynamicExtension.h | 41 +++++++++++++--------- .../reconstruction/BackgroundTieringPolicy.h | 3 ++ 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/benchmarks/tail-latency/standard_latency_dist.cpp b/benchmarks/tail-latency/standard_latency_dist.cpp index 7cce48c..34949d4 100644 --- a/benchmarks/tail-latency/standard_latency_dist.cpp +++ b/benchmarks/tail-latency/standard_latency_dist.cpp @@ -79,6 +79,7 @@ int main(int argc, char **argv) { TIMER_START(); for (size_t j=warmup; jinsert(data[j])) { + fprintf(stderr, "[B] %ld %ld\n", j, extension->get_record_count()); usleep(1); } } diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index aa07659..c2a59ea 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -13,6 +13,7 @@ #include #include #include +#include #include "framework/interface/Scheduler.h" #include "framework/reconstruction/ReconstructionPolicy.h" @@ -454,7 +455,7 @@ private: * this code will be bypassed in that case. */ if (args->priority == ReconstructionPriority::FLUSH) { - // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id()); + fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id()); assert(extension->m_flush_in_progress.load()); /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); @@ -488,7 +489,7 @@ private: args->version->advance_buffer_head(new_head); } else { - // fprintf(stderr, "[I] Running background reconstruction\n"); + fprintf(stderr, "[I] Running background reconstruction\n"); } /* perform all of the reconstructions */ @@ -505,7 +506,7 @@ private: */ if (args->version->get_id() == INVALID_VERSION) { args->version->set_id(extension->m_version_counter.fetch_add(1)); - // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id()); + fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id()); } /* advance the index to the newly finished version */ @@ -513,18 +514,26 @@ private: /* maint reconstructions can now safely release their locks */ if (args->priority == ReconstructionPriority::MAINT) { + std::set locked_levels; for (size_t i=0; itasks.size(); i++) { for (auto source : args->tasks[i].sources) { - extension->m_lock_mngr.release_lock(source.level_idx); + locked_levels.insert(source.level_idx); } } + + for (auto level : locked_levels) { + fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, args->version->get_id()); + extension->m_lock_mngr.release_lock(level); + } } if (args->priority == ReconstructionPriority::FLUSH) { extension->m_flush_in_progress.store(false); - // fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id()); + fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id()); } else { - // fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id()); + fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id()); + extension->print_structure(); + fflush(stdout); } /* manually delete the argument object */ @@ -602,7 +611,7 @@ private: */ version_ptr create_version_flush(std::unique_ptr structure) { size_t version_id = m_version_counter.fetch_add(1); - // fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id); + fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id); auto active_version = get_active_version(); std::shared_ptr new_version = std::make_shared(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head()); @@ -629,7 +638,7 @@ private: assert(new_version->get_structure()); assert(new_version->get_id() != INVALID_VERSION); - // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); + fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); /* wait until our turn to install the new version */ await_version(new_version->get_id() - 1); @@ -654,7 +663,7 @@ private: auto lk = std::unique_lock(m_version_advance_mtx); m_version_advance_cv.notify_all(); - // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id()); + fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id()); } StructureType *create_scratch_structure() { @@ -712,14 +721,14 @@ private: args->priority = ReconstructionPriority::FLUSH; args->initial_version = INVALID_VERSION; - // fprintf(stderr, "[I] Scheduling flush (%ld)\n", new_version->get_id()); + fprintf(stderr, "[I] Scheduling flush (%ld)\n", new_version->get_id()); /* * NOTE: args is deleted by the reconstruction job, so shouldn't be * freed here */ m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, FLUSH); - // fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", new_version->get_id()); + fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", new_version->get_id()); if (m_config.recon_enable_maint_on_flush) { schedule_maint_reconstruction(false); @@ -738,14 +747,14 @@ private: begin_reconstruction_scheduling(); } - // fprintf(stderr, "[I] Scheduling maintenance\n"); + fprintf(stderr, "[I] Scheduling maintenance\n"); auto active_version = m_active_version.load(); auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr); - // if (reconstructions.size() == 0) { - // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n"); - // } + if (reconstructions.size() == 0) { + fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n"); + } for (auto &recon : reconstructions) { /* @@ -793,7 +802,7 @@ private: if (m_buffer->is_at_high_watermark() && current_l0 > max_l0) { schedule_maint_reconstruction(true); - // fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0); + fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0); return 0; } diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h index 5a82695..a1bbddc 100644 --- a/include/framework/reconstruction/BackgroundTieringPolicy.h +++ b/include/framework/reconstruction/BackgroundTieringPolicy.h @@ -45,6 +45,7 @@ public: for (level_index i = target_level; i > source_level; i--) { if (lock_mngr.take_lock(i-1)) { + fprintf(stderr, "[I] Taking lock on %ld (%ld)\n", i-1, version->get_id()); ReconstructionVector recon; size_t target_reccnt = (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0; @@ -56,6 +57,8 @@ public: recon.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact); reconstructions.push_back(recon); + } else { + fprintf(stderr, "[I] Failed to get lock on %ld (%ld)\n", i-1, version->get_id()); } } -- cgit v1.2.3