summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h96
1 files changed, 40 insertions, 56 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index c2a59ea..31eb138 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -351,8 +351,8 @@ public:
}
/* versions signal on m_version_advance_cv when they activate */
+ std::unique_lock lk(m_version_advance_mtx);
while (m_active_version.load()->get_id() < vid) {
- std::unique_lock lk(m_version_advance_mtx);
m_version_advance_cv.wait(lk);
}
@@ -402,8 +402,6 @@ private:
std::condition_variable m_version_advance_cv;
std::mutex m_version_advance_mtx;
- std::atomic<bool> m_flush_in_progress = false;
-
LockManager m_lock_mngr;
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
@@ -436,13 +434,15 @@ private:
}
}
- size_t m_flush_cnt = 0;
-
static void reconstruction(void *arguments) {
auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments;
auto extension = (DynamicExtension *)args->extension;
extension->SetThreadAffinity();
+
+ std::vector<reconstruction_results<ShardType>> reconstructions;
+ size_t new_head = 0;
+
/*
* For "normal" flushes, the task vector should be empty, so this is
* all that will happen. Piggybacking internal reconstructions off
@@ -456,48 +456,26 @@ private:
*/
if (args->priority == ReconstructionPriority::FLUSH) {
fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id());
- assert(extension->m_flush_in_progress.load());
+
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
assert(buffview.get_tail() != buffview.get_head());
- size_t new_head = buffview.get_tail();
-
- auto new_shard = std::make_shared<ShardType>(std::move(buffview));
-
-
- /*
- * Flushes already know their version id. To avoid needing to
- * do any update reconciliation between structures, they wait
- * until the version directly preceeding them has been installed,
- * and only then take a copy of the structure.
- */
- extension->await_version(args->version->get_id() - 1);
+ new_head = buffview.get_tail();
- if (extension->m_config.recon_maint_disabled) {
- assert(args->version->get_mutable_structure());
- args->version->get_mutable_structure()->append_l0(std::move(new_shard), args->version->get_id());
- } else {
- assert(!args->version->get_mutable_structure());
- auto structure = std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy());
+ reconstruction_results<ShardType> flush_recon;
+ flush_recon.target_level = 0;
+ flush_recon.new_shard = std::make_shared<ShardType>(std::move(buffview));
- /* add the newly created shard to the structure copy */
- structure->append_l0(std::move(new_shard), args->version->get_id());
-
- /* set this version's structure to the newly created one */
- args->version->set_structure(std::move(structure));
- }
-
- args->version->advance_buffer_head(new_head);
+ reconstructions.push_back(flush_recon);
} else {
fprintf(stderr, "[I] Running background reconstruction\n");
}
/* perform all of the reconstructions */
- StructureType *structure = args->version->get_mutable_structure();
+ auto structure = args->version->get_structure();
+
for (size_t i = 0; i < args->tasks.size(); i++) {
- if (structure->perform_reconstruction(args->tasks[i])) {
- extension->m_lock_mngr.add_lock();
- }
+ reconstructions.emplace_back(structure->perform_reconstruction(args->tasks[i]));
}
/*
@@ -508,6 +486,20 @@ private:
args->version->set_id(extension->m_version_counter.fetch_add(1));
fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id());
}
+
+ /* wait for our opportunity to install the updates */
+ extension->await_version(args->version->get_id() - 1);
+
+ /* get a fresh copy of the structure and apply our updates */
+ args->version->set_structure(std::move(std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy())));
+
+ for (auto recon : reconstructions) {
+ auto grow = args->version->get_mutable_structure()->append_shard(recon.new_shard, args->version->get_id(), recon.target_level);
+ args->version->get_mutable_structure()->delete_shards(recon.source_shards);
+ if (grow) {
+ extension->m_lock_mngr.add_lock();
+ }
+ }
/* advance the index to the newly finished version */
extension->install_new_version(args->version, args->initial_version);
@@ -526,10 +518,16 @@ private:
extension->m_lock_mngr.release_lock(level);
}
}
-
+
if (args->priority == ReconstructionPriority::FLUSH) {
- extension->m_flush_in_progress.store(false);
+ /* advance the buffer head for a flush */
+ args->version->advance_buffer_head(new_head);
+
+ extension->m_lock_mngr.release_buffer_lock();
+
fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id());
+ extension->print_structure();
+ fflush(stdout);
} else {
fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id());
extension->print_structure();
@@ -638,29 +636,18 @@ private:
assert(new_version->get_structure());
assert(new_version->get_id() != INVALID_VERSION);
- fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
+ fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
+
/* wait until our turn to install the new version */
await_version(new_version->get_id() - 1);
- auto old = get_active_version();
-
- new_version->merge_changes_from(old.get(), old_active_version_id);
- new_version->update_shard_version(new_version->get_id());
-
+ auto lk = std::unique_lock(m_version_advance_mtx);
/*
* Only one version can have a given number, so we are safe to
* directly assign here--nobody else is going to change it out from
* under us.
*/
m_active_version.store(new_version);
-
- /*
- * My understanding is that you don't *really* need this mutex for
- * safety in modern C++ when sending the signal. But I'll grab it
- * anyway out of an abundance of caution. I doubt this will be a
- * major bottleneck.
- */
- auto lk = std::unique_lock(m_version_advance_mtx);
m_version_advance_cv.notify_all();
fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id());
@@ -687,14 +674,11 @@ private:
void schedule_flush() {
begin_reconstruction_scheduling();
- bool old = m_flush_in_progress.load();
- if (old || !m_flush_in_progress.compare_exchange_strong(old, true)) {
+ if (!m_lock_mngr.take_buffer_lock()) {
end_reconstruction_scheduling();
- m_version_advance_cv.notify_all();
return;
}
-
/*
* for "legacy" policies, without background reconstruction, we need
* a valid structure object as part of the version prior to determining