diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-04-07 14:37:19 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-04-07 14:37:19 -0400 |
| commit | a9ee3f2f7f557adbe5448e4f4fbf71c23fe808b1 (patch) | |
| tree | c3c404223d852b64a57d5265221c4a2d05d0af73 /include/framework/DynamicExtension.h | |
| parent | 6bdcf74ad91e0efaa8c2e4339f5085fde8a7982b (diff) | |
| download | dynamic-extension-a9ee3f2f7f557adbe5448e4f4fbf71c23fe808b1.tar.gz | |
Implemented the legacy policies and fixed a few bugs
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 50 |
1 files changed, 24 insertions, 26 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 9d76813..a65498c 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -479,17 +479,15 @@ private: * this code will be bypassed in that case. */ if (args->priority == ReconstructionPriority::FLUSH) { - // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id); - // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n", - // args->version->get_id(), recon_id); + // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id); + // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n", args->version->get_id(), recon_id); /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); assert(buffview.get_tail() != buffview.get_head()); new_head = buffview.get_tail(); - // fprintf(stderr, "\t[I] Current Buffer Head:\t%ld (%ld)\n", - // buffview.get_head(), recon_id); + // fprintf(stderr, "\t[I] Current Buffer Head:\t%ld (%ld)\n", buffview.get_head(), recon_id); reconstruction_results<ShardType> flush_recon; flush_recon.target_level = 0; @@ -497,6 +495,18 @@ private: reconstructions.push_back(flush_recon); + /* + * Eager policies need access to the flushed shard immediately, so + * we add it to the current structure. This gives such policies + * access to it within their own reconstructions later. + * + * This is necessary for Leveling and BSM specifically. Tiering-based + * policies can ignore this shard (by explicitly specifying the + * shards in L0 to use), or use it (by using the "all_shards_idx" + * shard id). + */ + args->version->get_mutable_structure()->append_shard(flush_recon.new_shard, args->version->get_id(), flush_recon.target_level); + /* advance the buffer head for a flush */ bool success = false; size_t failure_cnt = 0; @@ -505,35 +515,28 @@ private: if (!success) { failure_cnt++; usleep(1); - // fprintf(stderr, "\t[I] Buffer head advance blocked on %ld (%ld)\n", - // args->version->get_id(), recon_id); + // fprintf(stderr, "\t[I] Buffer head advance blocked on %ld (%ld)\n", args->version->get_id(), recon_id); if (failure_cnt >= extension->m_config.buffer_flush_query_preemption_trigger) { extension->preempt_queries(); if (failure_cnt > 500000) { - // fprintf(stderr, - // "[C] Critical failure. Hung on version: %ld (%ld)\n", - // extension->m_buffer->debug_get_old_head(), recon_id); + // fprintf(stderr, "[C] Critical failure. Hung on version: %ld (%ld)\n", extension->m_buffer->debug_get_old_head(), recon_id); } } } } - - // fprintf(stderr, "\t[I] Buffer head advanced to:\t%ld (%ld)\n", - // new_head, recon_id); + // fprintf(stderr, "\t[I] Buffer head advanced to:\t%ld (%ld)\n", new_head, recon_id); } else { - // fprintf(stderr, "[I] Running background reconstruction (%ld)\n", - // recon_id); + // fprintf(stderr, "[I] Running background reconstruction (%ld)\n", recon_id); } /* perform all of the reconstructions */ auto structure = args->version->get_structure(); assert(structure); - // fprintf(stderr, "\t[I] Pre-reconstruction L0 Size\t%ld (%ld)\n", - // structure->get_level_vector()[0]->get_shard_count(), recon_id); + // fprintf(stderr, "\t[I] Pre-reconstruction L0 Size\t%ld (%ld)\n", structure->get_level_vector()[0]->get_shard_count(), recon_id); for (size_t i = 0; i < args->tasks.size(); i++) { reconstructions.emplace_back( @@ -584,24 +587,20 @@ private: size_t new_reccnt = args->version->get_structure()->get_record_count(); - // fprintf(stderr, "\t[I] Post-reconstruction L0 Size\t%ld (%ld)\n", - // args->version->get_structure()->get_level_vector()[0]->get_shard_count(), - // recon_id); + // fprintf(stderr, "\t[I] Post-reconstruction L0 Size\t%ld (%ld)\n", args->version->get_structure()->get_level_vector()[0]->get_shard_count(), recon_id); /* for maintenance reconstructions, advance the buffer head to match the * currently active version */ if (args->priority == ReconstructionPriority::MAINT) { args->version->set_buffer(extension->m_buffer.get(), active_version->get_head()); - // fprintf(stderr, "\t[I] Buffer head set to %ld (%ld)\n", - // active_version->get_head(), recon_id); + // fprintf(stderr, "\t[I] Buffer head set to %ld (%ld)\n", active_version->get_head(), recon_id); if (new_reccnt != cur_reccnt) { fprintf(stderr, "ERROR: invalid reccnt (%ld)\n", recon_id); } } - // fprintf(stderr, "\t[I] Record Counts: %ld %ld %ld (%ld)\n", old_reccnt, - // cur_reccnt, new_reccnt, recon_id); + // fprintf(stderr, "\t[I] Record Counts: %ld %ld %ld (%ld)\n", old_reccnt, cur_reccnt, new_reccnt, recon_id); /* advance the index to the newly finished version */ extension->install_new_version(args->version, args->initial_version); @@ -626,8 +625,7 @@ private: extension->m_lock_mngr.release_buffer_lock(); } - // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n", - // args->version->get_id(), recon_id); + // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n", args->version->get_id(), recon_id); /* manually delete the argument object */ delete args; |