summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-04-07 14:37:19 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2025-04-07 14:37:19 -0400
commita9ee3f2f7f557adbe5448e4f4fbf71c23fe808b1 (patch)
treec3c404223d852b64a57d5265221c4a2d05d0af73 /include/framework/DynamicExtension.h
parent6bdcf74ad91e0efaa8c2e4339f5085fde8a7982b (diff)
downloaddynamic-extension-a9ee3f2f7f557adbe5448e4f4fbf71c23fe808b1.tar.gz
Implemented the legacy policies and fixed a few bugs
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h50
1 files changed, 24 insertions, 26 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 9d76813..a65498c 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -479,17 +479,15 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
- // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id);
- // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n",
- // args->version->get_id(), recon_id);
+ // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id);
+ // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n", args->version->get_id(), recon_id);
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
assert(buffview.get_tail() != buffview.get_head());
new_head = buffview.get_tail();
- // fprintf(stderr, "\t[I] Current Buffer Head:\t%ld (%ld)\n",
- // buffview.get_head(), recon_id);
+ // fprintf(stderr, "\t[I] Current Buffer Head:\t%ld (%ld)\n", buffview.get_head(), recon_id);
reconstruction_results<ShardType> flush_recon;
flush_recon.target_level = 0;
@@ -497,6 +495,18 @@ private:
reconstructions.push_back(flush_recon);
+ /*
+ * Eager policies need access to the flushed shard immediately, so
+ * we add it to the current structure. This gives such policies
+ * access to it within their own reconstructions later.
+ *
+ * This is necessary for Leveling and BSM specifically. Tiering-based
+ * policies can ignore this shard (by explicitly specifying the
+ * shards in L0 to use), or use it (by using the "all_shards_idx"
+ * shard id).
+ */
+ args->version->get_mutable_structure()->append_shard(flush_recon.new_shard, args->version->get_id(), flush_recon.target_level);
+
/* advance the buffer head for a flush */
bool success = false;
size_t failure_cnt = 0;
@@ -505,35 +515,28 @@ private:
if (!success) {
failure_cnt++;
usleep(1);
- // fprintf(stderr, "\t[I] Buffer head advance blocked on %ld (%ld)\n",
- // args->version->get_id(), recon_id);
+ // fprintf(stderr, "\t[I] Buffer head advance blocked on %ld (%ld)\n", args->version->get_id(), recon_id);
if (failure_cnt >=
extension->m_config.buffer_flush_query_preemption_trigger) {
extension->preempt_queries();
if (failure_cnt > 500000) {
- // fprintf(stderr,
- // "[C] Critical failure. Hung on version: %ld (%ld)\n",
- // extension->m_buffer->debug_get_old_head(), recon_id);
+ // fprintf(stderr, "[C] Critical failure. Hung on version: %ld (%ld)\n", extension->m_buffer->debug_get_old_head(), recon_id);
}
}
}
}
-
- // fprintf(stderr, "\t[I] Buffer head advanced to:\t%ld (%ld)\n",
- // new_head, recon_id);
+ // fprintf(stderr, "\t[I] Buffer head advanced to:\t%ld (%ld)\n", new_head, recon_id);
} else {
- // fprintf(stderr, "[I] Running background reconstruction (%ld)\n",
- // recon_id);
+ // fprintf(stderr, "[I] Running background reconstruction (%ld)\n", recon_id);
}
/* perform all of the reconstructions */
auto structure = args->version->get_structure();
assert(structure);
- // fprintf(stderr, "\t[I] Pre-reconstruction L0 Size\t%ld (%ld)\n",
- // structure->get_level_vector()[0]->get_shard_count(), recon_id);
+ // fprintf(stderr, "\t[I] Pre-reconstruction L0 Size\t%ld (%ld)\n", structure->get_level_vector()[0]->get_shard_count(), recon_id);
for (size_t i = 0; i < args->tasks.size(); i++) {
reconstructions.emplace_back(
@@ -584,24 +587,20 @@ private:
size_t new_reccnt = args->version->get_structure()->get_record_count();
- // fprintf(stderr, "\t[I] Post-reconstruction L0 Size\t%ld (%ld)\n",
- // args->version->get_structure()->get_level_vector()[0]->get_shard_count(),
- // recon_id);
+ // fprintf(stderr, "\t[I] Post-reconstruction L0 Size\t%ld (%ld)\n", args->version->get_structure()->get_level_vector()[0]->get_shard_count(), recon_id);
/* for maintenance reconstructions, advance the buffer head to match the
* currently active version */
if (args->priority == ReconstructionPriority::MAINT) {
args->version->set_buffer(extension->m_buffer.get(),
active_version->get_head());
- // fprintf(stderr, "\t[I] Buffer head set to %ld (%ld)\n",
- // active_version->get_head(), recon_id);
+ // fprintf(stderr, "\t[I] Buffer head set to %ld (%ld)\n", active_version->get_head(), recon_id);
if (new_reccnt != cur_reccnt) {
fprintf(stderr, "ERROR: invalid reccnt (%ld)\n", recon_id);
}
}
- // fprintf(stderr, "\t[I] Record Counts: %ld %ld %ld (%ld)\n", old_reccnt,
- // cur_reccnt, new_reccnt, recon_id);
+ // fprintf(stderr, "\t[I] Record Counts: %ld %ld %ld (%ld)\n", old_reccnt, cur_reccnt, new_reccnt, recon_id);
/* advance the index to the newly finished version */
extension->install_new_version(args->version, args->initial_version);
@@ -626,8 +625,7 @@ private:
extension->m_lock_mngr.release_buffer_lock();
}
- // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n",
- // args->version->get_id(), recon_id);
+ // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n", args->version->get_id(), recon_id);
/* manually delete the argument object */
delete args;