summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-02-11 17:53:44 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-02-11 17:53:44 -0500
commit125e243cad99aa29444759e15053fd148ff0e3ba (patch)
treeb6c61fd3276cf56ea1fd5e90a7149e3d7f424e5d /include/framework/DynamicExtension.h
parent85afe4ef04f327862460570fb0aa4c30afcf7cc7 (diff)
downloaddynamic-extension-125e243cad99aa29444759e15053fd148ff0e3ba.tar.gz
more updates
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h41
1 files changed, 25 insertions, 16 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index aa07659..c2a59ea 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -13,6 +13,7 @@
#include <cstdio>
#include <mutex>
#include <vector>
+#include <set>
#include "framework/interface/Scheduler.h"
#include "framework/reconstruction/ReconstructionPolicy.h"
@@ -454,7 +455,7 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
- // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id());
+ fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id());
assert(extension->m_flush_in_progress.load());
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
@@ -488,7 +489,7 @@ private:
args->version->advance_buffer_head(new_head);
} else {
- // fprintf(stderr, "[I] Running background reconstruction\n");
+ fprintf(stderr, "[I] Running background reconstruction\n");
}
/* perform all of the reconstructions */
@@ -505,7 +506,7 @@ private:
*/
if (args->version->get_id() == INVALID_VERSION) {
args->version->set_id(extension->m_version_counter.fetch_add(1));
- // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id());
+ fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id());
}
/* advance the index to the newly finished version */
@@ -513,18 +514,26 @@ private:
/* maint reconstructions can now safely release their locks */
if (args->priority == ReconstructionPriority::MAINT) {
+ std::set<size_t> locked_levels;
for (size_t i=0; i<args->tasks.size(); i++) {
for (auto source : args->tasks[i].sources) {
- extension->m_lock_mngr.release_lock(source.level_idx);
+ locked_levels.insert(source.level_idx);
}
}
+
+ for (auto level : locked_levels) {
+ fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, args->version->get_id());
+ extension->m_lock_mngr.release_lock(level);
+ }
}
if (args->priority == ReconstructionPriority::FLUSH) {
extension->m_flush_in_progress.store(false);
- // fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id());
+ fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id());
} else {
- // fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id());
+ fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id());
+ extension->print_structure();
+ fflush(stdout);
}
/* manually delete the argument object */
@@ -602,7 +611,7 @@ private:
*/
version_ptr create_version_flush(std::unique_ptr<StructureType> structure) {
size_t version_id = m_version_counter.fetch_add(1);
- // fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id);
+ fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id);
auto active_version = get_active_version();
std::shared_ptr<VersionType> new_version =
std::make_shared<VersionType>(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
@@ -629,7 +638,7 @@ private:
assert(new_version->get_structure());
assert(new_version->get_id() != INVALID_VERSION);
- // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
+ fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
/* wait until our turn to install the new version */
await_version(new_version->get_id() - 1);
@@ -654,7 +663,7 @@ private:
auto lk = std::unique_lock(m_version_advance_mtx);
m_version_advance_cv.notify_all();
- // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id());
+ fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id());
}
StructureType *create_scratch_structure() {
@@ -712,14 +721,14 @@ private:
args->priority = ReconstructionPriority::FLUSH;
args->initial_version = INVALID_VERSION;
- // fprintf(stderr, "[I] Scheduling flush (%ld)\n", new_version->get_id());
+ fprintf(stderr, "[I] Scheduling flush (%ld)\n", new_version->get_id());
/*
* NOTE: args is deleted by the reconstruction job, so shouldn't be
* freed here
*/
m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
FLUSH);
- // fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", new_version->get_id());
+ fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", new_version->get_id());
if (m_config.recon_enable_maint_on_flush) {
schedule_maint_reconstruction(false);
@@ -738,14 +747,14 @@ private:
begin_reconstruction_scheduling();
}
- // fprintf(stderr, "[I] Scheduling maintenance\n");
+ fprintf(stderr, "[I] Scheduling maintenance\n");
auto active_version = m_active_version.load();
auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr);
- // if (reconstructions.size() == 0) {
- // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
- // }
+ if (reconstructions.size() == 0) {
+ fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
+ }
for (auto &recon : reconstructions) {
/*
@@ -793,7 +802,7 @@ private:
if (m_buffer->is_at_high_watermark() && current_l0 > max_l0) {
schedule_maint_reconstruction(true);
- // fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0);
+ fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0);
return 0;
}