summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h69
1 files changed, 59 insertions, 10 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index ef36de3..a48f390 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -89,6 +89,8 @@ public:
std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0));
m_version_counter = INITIAL_VERSION;
+ assert(m_config.recon_policy);
+ m_reconstruction_scheduled.store(false);
}
/**
@@ -374,6 +376,14 @@ public:
*/
void print_scheduler_statistics() const { m_sched->print_statistics(); }
+ /**
+ * Writes a schematic view of the currently active structure to
+ * stdout. Each level is on its own line, and each shard is represented.
+ */
+ void print_structure() {
+ get_active_version()->get_structure()->print_structure();
+ }
+
private:
ConfType m_config;
@@ -390,6 +400,8 @@ private:
std::condition_variable m_version_advance_cv;
std::mutex m_version_advance_mtx;
+ std::atomic<bool> m_reconstruction_scheduled;
+
std::atomic<bool> m_flush_in_progress = false;
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
@@ -441,6 +453,7 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
+ // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id());
assert(extension->m_flush_in_progress.load());
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
@@ -460,19 +473,21 @@ private:
if (extension->m_config.recon_maint_disabled) {
assert(args->version->get_mutable_structure());
- args->version->get_mutable_structure()->append_l0(std::move(new_shard));
+ args->version->get_mutable_structure()->append_l0(std::move(new_shard), args->version->get_id());
} else {
assert(!args->version->get_mutable_structure());
auto structure = std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy());
/* add the newly created shard to the structure copy */
- structure->append_l0(std::move(new_shard));
+ structure->append_l0(std::move(new_shard), args->version->get_id());
/* set this version's structure to the newly created one */
args->version->set_structure(std::move(structure));
}
args->version->advance_buffer_head(new_head);
+ } else {
+ // fprintf(stderr, "[I] Running background reconstruction\n");
}
/* perform all of the reconstructions */
@@ -487,13 +502,18 @@ private:
*/
if (args->version->get_id() == INVALID_VERSION) {
args->version->set_id(extension->m_version_counter.fetch_add(1));
+ // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id());
}
/* advance the index to the newly finished version */
- extension->install_new_version(args->version);
+ extension->install_new_version(args->version, args->initial_version);
if (args->priority == ReconstructionPriority::FLUSH) {
extension->m_flush_in_progress.store(false);
+ // fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id());
+ } else {
+ extension->m_reconstruction_scheduled.store(false);
+ // fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id());
}
/* manually delete the argument object */
@@ -593,17 +613,18 @@ private:
return new_version;
}
- void install_new_version(version_ptr new_version) {
+ void install_new_version(version_ptr new_version, size_t old_active_version_id) {
assert(new_version->get_structure());
assert(new_version->get_id() != INVALID_VERSION);
+ // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
/* wait until our turn to install the new version */
await_version(new_version->get_id() - 1);
auto old = get_active_version();
- // FIXME: implement this interface
- // new_version->merge_changes_from(old.load().get());
+ new_version->merge_changes_from(old.get(), old_active_version_id);
+ new_version->update_shard_version(new_version->get_id());
/*
* Only one version can have a given number, so we are safe to
@@ -620,6 +641,8 @@ private:
*/
auto lk = std::unique_lock(m_version_advance_mtx);
m_version_advance_cv.notify_all();
+
+ // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id());
}
StructureType *create_scratch_structure() {
@@ -649,6 +672,8 @@ private:
return;
}
+ // fprintf(stderr, "[I] Scheduling flush\n");
+
/*
* for "legacy" policies, without background reconstruction, we need
* a valid structure object as part of the version prior to determining
@@ -672,6 +697,7 @@ private:
args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get());
args->extension = this;
args->priority = ReconstructionPriority::FLUSH;
+ args->initial_version = INVALID_VERSION;
/*
* NOTE: args is deleted by the reconstruction job, so shouldn't be
@@ -689,7 +715,7 @@ private:
void schedule_maint_reconstruction(bool take_reconstruction_lock = true) {
- if (m_config.recon_maint_disabled) {
+ if (m_config.recon_maint_disabled || m_reconstruction_scheduled.load()) {
return;
}
@@ -697,21 +723,38 @@ private:
begin_reconstruction_scheduling();
}
+ if (m_reconstruction_scheduled.load()) {
+ end_reconstruction_scheduling();
+ return;
+ }
+
+ // fprintf(stderr, "[I] Scheduling maintenance\n");
+
+ m_reconstruction_scheduled.store(true);
+
// FIXME: memory management issue here?
- auto new_version = create_version_maint(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy()));
+ auto active_version = m_active_version.load();
+ auto new_version = create_version_maint(std::unique_ptr<StructureType>(active_version->get_structure()->copy()));
auto *args = new ReconstructionArgs<ShardType, QueryType>();
args->version = new_version;
args->tasks = m_config.recon_policy->get_reconstruction_tasks(args->version.get());
args->extension = this;
args->priority = ReconstructionPriority::MAINT;
+ args->initial_version = active_version->get_id();
/*
* NOTE: args is deleted by the reconstruction job, so shouldn't be
* freed here
*/
- m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
- RECONSTRUCTION);
+ if (args->tasks.size() > 0) {
+ m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
+ RECONSTRUCTION);
+ } else {
+ delete args;
+ m_reconstruction_scheduled.store(false);
+ // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
+ }
if (take_reconstruction_lock) {
end_reconstruction_scheduling();
@@ -739,6 +782,12 @@ private:
}
}
+
+ if (rand() % 1000 < 5) {
+ size_t l0_cnt = get_active_version()->get_structure()->get_level_vector()[0]->get_shard_count();
+ usleep(l0_cnt);
+ }
+
/* this will fail if the HWM is reached and return 0 */
return m_buffer->append(rec, ts);
}