summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-02-14 15:34:39 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-02-14 15:34:39 -0500
commitffbca7e200d8e7937592cd4a70d41f4d6ed9cb65 (patch)
tree4cae9e8e75c0718b635d5b7788a288982fe4efd4 /include/framework/DynamicExtension.h
parentd28f2cfcd4249fc7d984762a326e3f2d6dcba7dc (diff)
downloaddynamic-extension-ffbca7e200d8e7937592cd4a70d41f4d6ed9cb65.tar.gz
Parallel background reconstructions appear to be working!
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h71
1 files changed, 26 insertions, 45 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 31eb138..e97dfab 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -440,8 +440,8 @@ private:
extension->SetThreadAffinity();
- std::vector<reconstruction_results<ShardType>> reconstructions;
size_t new_head = 0;
+ std::vector<reconstruction_results<ShardType>> reconstructions;
/*
* For "normal" flushes, the task vector should be empty, so this is
@@ -455,7 +455,7 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
- fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id());
+ // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id());
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
@@ -467,8 +467,13 @@ private:
flush_recon.new_shard = std::make_shared<ShardType>(std::move(buffview));
reconstructions.push_back(flush_recon);
+
+ // fprintf(stderr, "[I] Buffer Head:\t%ld\n", new_head);
+
+ /* advance the buffer head for a flush */
+ args->version->advance_buffer_head(new_head);
} else {
- fprintf(stderr, "[I] Running background reconstruction\n");
+ // fprintf(stderr, "[I] Running background reconstruction\n");
}
/* perform all of the reconstructions */
@@ -484,7 +489,7 @@ private:
*/
if (args->version->get_id() == INVALID_VERSION) {
args->version->set_id(extension->m_version_counter.fetch_add(1));
- fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id());
+ // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id());
}
/* wait for our opportunity to install the updates */
@@ -501,6 +506,11 @@ private:
}
}
+ /* grab the latest buffer head */
+ if (args->priority == ReconstructionPriority::MAINT) {
+ args->version->set_head(extension->get_active_version()->get_head());
+ }
+
/* advance the index to the newly finished version */
extension->install_new_version(args->version, args->initial_version);
@@ -514,24 +524,13 @@ private:
}
for (auto level : locked_levels) {
- fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, args->version->get_id());
+ // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, args->version->get_id());
extension->m_lock_mngr.release_lock(level);
}
}
if (args->priority == ReconstructionPriority::FLUSH) {
- /* advance the buffer head for a flush */
- args->version->advance_buffer_head(new_head);
-
extension->m_lock_mngr.release_buffer_lock();
-
- fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id());
- extension->print_structure();
- fflush(stdout);
- } else {
- fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id());
- extension->print_structure();
- fflush(stdout);
}
/* manually delete the argument object */
@@ -609,7 +608,7 @@ private:
*/
version_ptr create_version_flush(std::unique_ptr<StructureType> structure) {
size_t version_id = m_version_counter.fetch_add(1);
- fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id);
+ // fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id);
auto active_version = get_active_version();
std::shared_ptr<VersionType> new_version =
std::make_shared<VersionType>(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
@@ -636,7 +635,7 @@ private:
assert(new_version->get_structure());
assert(new_version->get_id() != INVALID_VERSION);
- fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
+ // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
/* wait until our turn to install the new version */
await_version(new_version->get_id() - 1);
@@ -650,7 +649,7 @@ private:
m_active_version.store(new_version);
m_version_advance_cv.notify_all();
- fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id());
+ // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id());
}
StructureType *create_scratch_structure() {
@@ -679,40 +678,22 @@ private:
return;
}
- /*
- * for "legacy" policies, without background reconstruction, we need
- * a valid structure object as part of the version prior to determining
- * the flush operations. Otherwise, the flush operation should only ever
- * do the flush itself, and so no structure is needed at this point
- *
- * delaying obtaining the structure until later, when maintenance
- * reconstructions are enabled, removes the need for flushes to reconcile
- * differences in structure between when they are scheduled and when they
- * finish
- */
- std::unique_ptr<StructureType> structure = nullptr;
- if (m_config.recon_maint_disabled) {
- structure = std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy());
- }
-
- auto new_version = create_version_flush(std::move(structure));
-
-
auto *args = new ReconstructionArgs<ShardType, QueryType>();
- args->version = new_version;
- args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get());
+ args->version = create_version_flush(nullptr);
+ args->tasks = m_config.recon_policy->get_flush_tasks(get_active_version().get());
args->extension = this;
args->priority = ReconstructionPriority::FLUSH;
args->initial_version = INVALID_VERSION;
- fprintf(stderr, "[I] Scheduling flush (%ld)\n", new_version->get_id());
+ // fprintf(stderr, "[I] Scheduling flush (%ld)\n", args->version->get_id());
+
/*
* NOTE: args is deleted by the reconstruction job, so shouldn't be
* freed here
*/
m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
FLUSH);
- fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", new_version->get_id());
+ // fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", args->version->get_id());
if (m_config.recon_enable_maint_on_flush) {
schedule_maint_reconstruction(false);
@@ -731,13 +712,13 @@ private:
begin_reconstruction_scheduling();
}
- fprintf(stderr, "[I] Scheduling maintenance\n");
+ // fprintf(stderr, "[I] Scheduling maintenance\n");
auto active_version = m_active_version.load();
auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr);
if (reconstructions.size() == 0) {
- fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
+ // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
}
for (auto &recon : reconstructions) {
@@ -786,7 +767,7 @@ private:
if (m_buffer->is_at_high_watermark() && current_l0 > max_l0) {
schedule_maint_reconstruction(true);
- fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0);
+ // fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0);
return 0;
}