summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-02-14 15:34:39 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-02-14 15:34:39 -0500
commitffbca7e200d8e7937592cd4a70d41f4d6ed9cb65 (patch)
tree4cae9e8e75c0718b635d5b7788a288982fe4efd4 /include/framework
parentd28f2cfcd4249fc7d984762a326e3f2d6dcba7dc (diff)
downloaddynamic-extension-ffbca7e200d8e7937592cd4a70d41f4d6ed9cb65.tar.gz
Parallel background reconstructions appear to be working!
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h71
-rw-r--r--include/framework/reconstruction/BackgroundTieringPolicy.h3
-rw-r--r--include/framework/scheduling/Version.h42
-rw-r--r--include/framework/structure/InternalLevel.h1
-rw-r--r--include/framework/structure/MutableBuffer.h15
5 files changed, 48 insertions, 84 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 31eb138..e97dfab 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -440,8 +440,8 @@ private:
extension->SetThreadAffinity();
- std::vector<reconstruction_results<ShardType>> reconstructions;
size_t new_head = 0;
+ std::vector<reconstruction_results<ShardType>> reconstructions;
/*
* For "normal" flushes, the task vector should be empty, so this is
@@ -455,7 +455,7 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
- fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id());
+ // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id());
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
@@ -467,8 +467,13 @@ private:
flush_recon.new_shard = std::make_shared<ShardType>(std::move(buffview));
reconstructions.push_back(flush_recon);
+
+ // fprintf(stderr, "[I] Buffer Head:\t%ld\n", new_head);
+
+ /* advance the buffer head for a flush */
+ args->version->advance_buffer_head(new_head);
} else {
- fprintf(stderr, "[I] Running background reconstruction\n");
+ // fprintf(stderr, "[I] Running background reconstruction\n");
}
/* perform all of the reconstructions */
@@ -484,7 +489,7 @@ private:
*/
if (args->version->get_id() == INVALID_VERSION) {
args->version->set_id(extension->m_version_counter.fetch_add(1));
- fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id());
+ // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id());
}
/* wait for our opportunity to install the updates */
@@ -501,6 +506,11 @@ private:
}
}
+ /* grab the latest buffer head */
+ if (args->priority == ReconstructionPriority::MAINT) {
+ args->version->set_head(extension->get_active_version()->get_head());
+ }
+
/* advance the index to the newly finished version */
extension->install_new_version(args->version, args->initial_version);
@@ -514,24 +524,13 @@ private:
}
for (auto level : locked_levels) {
- fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, args->version->get_id());
+ // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, args->version->get_id());
extension->m_lock_mngr.release_lock(level);
}
}
if (args->priority == ReconstructionPriority::FLUSH) {
- /* advance the buffer head for a flush */
- args->version->advance_buffer_head(new_head);
-
extension->m_lock_mngr.release_buffer_lock();
-
- fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id());
- extension->print_structure();
- fflush(stdout);
- } else {
- fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id());
- extension->print_structure();
- fflush(stdout);
}
/* manually delete the argument object */
@@ -609,7 +608,7 @@ private:
*/
version_ptr create_version_flush(std::unique_ptr<StructureType> structure) {
size_t version_id = m_version_counter.fetch_add(1);
- fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id);
+ // fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id);
auto active_version = get_active_version();
std::shared_ptr<VersionType> new_version =
std::make_shared<VersionType>(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
@@ -636,7 +635,7 @@ private:
assert(new_version->get_structure());
assert(new_version->get_id() != INVALID_VERSION);
- fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
+ // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
/* wait until our turn to install the new version */
await_version(new_version->get_id() - 1);
@@ -650,7 +649,7 @@ private:
m_active_version.store(new_version);
m_version_advance_cv.notify_all();
- fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id());
+ // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id());
}
StructureType *create_scratch_structure() {
@@ -679,40 +678,22 @@ private:
return;
}
- /*
- * for "legacy" policies, without background reconstruction, we need
- * a valid structure object as part of the version prior to determining
- * the flush operations. Otherwise, the flush operation should only ever
- * do the flush itself, and so no structure is needed at this point
- *
- * delaying obtaining the structure until later, when maintenance
- * reconstructions are enabled, removes the need for flushes to reconcile
- * differences in structure between when they are scheduled and when they
- * finish
- */
- std::unique_ptr<StructureType> structure = nullptr;
- if (m_config.recon_maint_disabled) {
- structure = std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy());
- }
-
- auto new_version = create_version_flush(std::move(structure));
-
-
auto *args = new ReconstructionArgs<ShardType, QueryType>();
- args->version = new_version;
- args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get());
+ args->version = create_version_flush(nullptr);
+ args->tasks = m_config.recon_policy->get_flush_tasks(get_active_version().get());
args->extension = this;
args->priority = ReconstructionPriority::FLUSH;
args->initial_version = INVALID_VERSION;
- fprintf(stderr, "[I] Scheduling flush (%ld)\n", new_version->get_id());
+ // fprintf(stderr, "[I] Scheduling flush (%ld)\n", args->version->get_id());
+
/*
* NOTE: args is deleted by the reconstruction job, so shouldn't be
* freed here
*/
m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
FLUSH);
- fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", new_version->get_id());
+ // fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", args->version->get_id());
if (m_config.recon_enable_maint_on_flush) {
schedule_maint_reconstruction(false);
@@ -731,13 +712,13 @@ private:
begin_reconstruction_scheduling();
}
- fprintf(stderr, "[I] Scheduling maintenance\n");
+ // fprintf(stderr, "[I] Scheduling maintenance\n");
auto active_version = m_active_version.load();
auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr);
if (reconstructions.size() == 0) {
- fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
+ // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
}
for (auto &recon : reconstructions) {
@@ -786,7 +767,7 @@ private:
if (m_buffer->is_at_high_watermark() && current_l0 > max_l0) {
schedule_maint_reconstruction(true);
- fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0);
+ // fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0);
return 0;
}
diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h
index a1bbddc..5a82695 100644
--- a/include/framework/reconstruction/BackgroundTieringPolicy.h
+++ b/include/framework/reconstruction/BackgroundTieringPolicy.h
@@ -45,7 +45,6 @@ public:
for (level_index i = target_level; i > source_level; i--) {
if (lock_mngr.take_lock(i-1)) {
- fprintf(stderr, "[I] Taking lock on %ld (%ld)\n", i-1, version->get_id());
ReconstructionVector recon;
size_t target_reccnt =
(i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0;
@@ -57,8 +56,6 @@ public:
recon.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact);
reconstructions.push_back(recon);
- } else {
- fprintf(stderr, "[I] Failed to get lock on %ld (%ld)\n", i-1, version->get_id());
}
}
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h
index fa677f2..4cd73ba 100644
--- a/include/framework/scheduling/Version.h
+++ b/include/framework/scheduling/Version.h
@@ -54,10 +54,6 @@ public:
StructureType *get_mutable_structure() { return m_structure.get(); }
bool set_structure(std::unique_ptr<StructureType> new_struct) {
- if (m_structure) {
- return false;
- }
-
m_structure = std::move(new_struct);
return true;
}
@@ -89,38 +85,18 @@ public:
return m_buffer->advance_head(new_head);
}
- void merge_changes_from(Version *old, size_t version_id) {
- /*
- * for a maint reconstruction, the buffer head may have advanced
- * during the reconstruction; we don't need to adjust the buffer
- * for maintenance reconstructions, so we can simply "catch" the
- * internal head index up to the current version.
- */
- if (old->m_buffer_head > m_buffer_head) {
- m_buffer_head = old->m_buffer_head;
- }
-
- // FIXME: we should also ensure that we don't clobber anything
- // in the event that multiple concurrent reconstructions affect
- // the same levels. As it stands, two reconstructions *could* share
- // source shards, resulting in some records being lost or duplicated.
- //
- // For the moment, I'm being careful to avoid this within the
- // scheduling policy itself, and only forwarding changes to this
- // version.
-
- /* using INVALID_VERSION disables shard reconcilliation */
- if (version_id == 0) {
- return;
- }
+ void update_shard_version(size_t version) {
+ m_structure->update_shard_version(version);
+ }
- /* add any shards newer than version_id to this version */
- auto old_structure = old->get_structure();
- m_structure->merge_structure(old_structure, version_id);
+ size_t get_head() {
+ return m_buffer_head;
}
- void update_shard_version(size_t version) {
- m_structure->update_shard_version(version);
+
+ void set_head(size_t head) {
+ // fprintf(stderr, "[I] Updating buffer head of %ld to %ld\n", get_id(), head);
+ m_buffer_head = head;
}
private:
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 7e8e87d..6e8b67e 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -200,7 +200,6 @@ public:
void delete_shard(shard_index shard, bool log_delete=true) {
size_t before = m_shards.size();
- fprintf(stderr, "[D]\tReconstruction deleting shard %ld %p\n", shard, m_shards[shard].first.get());
m_shards.erase(m_shards.begin() + shard);
size_t after = m_shards.size();
assert( before > after);
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 0197ecd..105f0f3 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -149,13 +149,22 @@ public:
/* refuse to advance head while there is an old with one references */
if (m_old_head.load().refcnt > 0) {
- // fprintf(stderr, "[W]: Refusing to advance head due to remaining
- // reference counts\n");
+ //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n");
return false;
}
m_active_head_advance.store(true);
+ if (m_old_head.load().refcnt > 0) {
+ //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts [2]\n");
+ m_active_head_advance.store(false);
+ return false;
+ }
+
+ // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n", m_old_head.load().head_idx, m_head.load().head_idx, new_head);
+ // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, m_head.load().refcnt);
+
+
buffer_head new_hd = {new_head, 0};
buffer_head cur_hd;
@@ -179,6 +188,8 @@ public:
buffer_head cur_hd, new_hd;
bool head_acquired = false;
+
+ //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx);
do {
if (m_old_head.load().head_idx == target_head) {
cur_hd = m_old_head.load();