summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h87
1 files changed, 48 insertions, 39 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index a48f390..aa07659 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -17,6 +17,7 @@
#include "framework/interface/Scheduler.h"
#include "framework/reconstruction/ReconstructionPolicy.h"
#include "framework/scheduling/SerialScheduler.h"
+#include "framework/scheduling/LockManager.h"
#include "framework/scheduling/Task.h"
#include "framework/structure/ExtensionStructure.h"
@@ -51,6 +52,7 @@ private:
static constexpr size_t QUERY = 1;
static constexpr size_t RECONSTRUCTION = 2;
+ static constexpr size_t FLUSH = 3;
typedef std::shared_ptr<VersionType> version_ptr;
typedef size_t version_id;
@@ -90,7 +92,6 @@ public:
m_version_counter = INITIAL_VERSION;
assert(m_config.recon_policy);
- m_reconstruction_scheduled.store(false);
}
/**
@@ -400,10 +401,10 @@ private:
std::condition_variable m_version_advance_cv;
std::mutex m_version_advance_mtx;
- std::atomic<bool> m_reconstruction_scheduled;
-
std::atomic<bool> m_flush_in_progress = false;
+ LockManager m_lock_mngr;
+
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
void enforce_delete_invariant(VersionType *version) {
@@ -493,7 +494,9 @@ private:
/* perform all of the reconstructions */
StructureType *structure = args->version->get_mutable_structure();
for (size_t i = 0; i < args->tasks.size(); i++) {
- structure->perform_reconstruction(args->tasks[i]);
+ if (structure->perform_reconstruction(args->tasks[i])) {
+ extension->m_lock_mngr.add_lock();
+ }
}
/*
@@ -508,11 +511,19 @@ private:
/* advance the index to the newly finished version */
extension->install_new_version(args->version, args->initial_version);
+ /* maint reconstructions can now safely release their locks */
+ if (args->priority == ReconstructionPriority::MAINT) {
+ for (size_t i=0; i<args->tasks.size(); i++) {
+ for (auto source : args->tasks[i].sources) {
+ extension->m_lock_mngr.release_lock(source.level_idx);
+ }
+ }
+ }
+
if (args->priority == ReconstructionPriority::FLUSH) {
extension->m_flush_in_progress.store(false);
// fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id());
} else {
- extension->m_reconstruction_scheduled.store(false);
// fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id());
}
@@ -591,6 +602,7 @@ private:
*/
version_ptr create_version_flush(std::unique_ptr<StructureType> structure) {
size_t version_id = m_version_counter.fetch_add(1);
+ // fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id);
auto active_version = get_active_version();
std::shared_ptr<VersionType> new_version =
std::make_shared<VersionType>(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
@@ -617,7 +629,7 @@ private:
assert(new_version->get_structure());
assert(new_version->get_id() != INVALID_VERSION);
- // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
+ // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
/* wait until our turn to install the new version */
await_version(new_version->get_id() - 1);
@@ -669,10 +681,10 @@ private:
bool old = m_flush_in_progress.load();
if (old || !m_flush_in_progress.compare_exchange_strong(old, true)) {
end_reconstruction_scheduling();
+ m_version_advance_cv.notify_all();
return;
}
- // fprintf(stderr, "[I] Scheduling flush\n");
/*
* for "legacy" policies, without background reconstruction, we need
@@ -692,6 +704,7 @@ private:
auto new_version = create_version_flush(std::move(structure));
+
auto *args = new ReconstructionArgs<ShardType, QueryType>();
args->version = new_version;
args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get());
@@ -699,12 +712,14 @@ private:
args->priority = ReconstructionPriority::FLUSH;
args->initial_version = INVALID_VERSION;
+ // fprintf(stderr, "[I] Scheduling flush (%ld)\n", new_version->get_id());
/*
* NOTE: args is deleted by the reconstruction job, so shouldn't be
* freed here
*/
m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
- RECONSTRUCTION);
+ FLUSH);
+ // fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", new_version->get_id());
if (m_config.recon_enable_maint_on_flush) {
schedule_maint_reconstruction(false);
@@ -715,7 +730,7 @@ private:
void schedule_maint_reconstruction(bool take_reconstruction_lock = true) {
- if (m_config.recon_maint_disabled || m_reconstruction_scheduled.load()) {
+ if (m_config.recon_maint_disabled) {
return;
}
@@ -723,37 +738,28 @@ private:
begin_reconstruction_scheduling();
}
- if (m_reconstruction_scheduled.load()) {
- end_reconstruction_scheduling();
- return;
- }
-
// fprintf(stderr, "[I] Scheduling maintenance\n");
- m_reconstruction_scheduled.store(true);
-
- // FIXME: memory management issue here?
auto active_version = m_active_version.load();
- auto new_version = create_version_maint(std::unique_ptr<StructureType>(active_version->get_structure()->copy()));
+ auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr);
- auto *args = new ReconstructionArgs<ShardType, QueryType>();
- args->version = new_version;
- args->tasks = m_config.recon_policy->get_reconstruction_tasks(args->version.get());
- args->extension = this;
- args->priority = ReconstructionPriority::MAINT;
- args->initial_version = active_version->get_id();
+ // if (reconstructions.size() == 0) {
+ // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
+ // }
- /*
- * NOTE: args is deleted by the reconstruction job, so shouldn't be
- * freed here
- */
- if (args->tasks.size() > 0) {
+ for (auto &recon : reconstructions) {
+ /*
+ * NOTE: args is deleted by the reconstruction job, so shouldn't be
+ * freed here
+ */
+ auto *args = new ReconstructionArgs<ShardType, QueryType>();
+ args->version = create_version_maint(std::unique_ptr<StructureType>(active_version->get_structure()->copy()));
+ args->tasks = std::move(recon);
+ args->extension = this;
+ args->priority = ReconstructionPriority::MAINT;
+ args->initial_version = active_version->get_id();
m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
- RECONSTRUCTION);
- } else {
- delete args;
- m_reconstruction_scheduled.store(false);
- // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
+ RECONSTRUCTION);
}
if (take_reconstruction_lock) {
@@ -775,17 +781,20 @@ private:
}
int internal_append(const RecordType &rec, bool ts) {
- if (m_buffer->is_at_low_watermark()) {
+ size_t max_l0 = (log(get_record_count()) / log(8)) + 1;
+ size_t current_l0 = get_active_version()->get_structure()->get_level_vector()[0]->get_shard_count();
+
+ if (m_buffer->is_at_low_watermark() && current_l0 <= max_l0) {
auto old = false;
if (m_scheduling_reconstruction.compare_exchange_strong(old, true)) {
schedule_flush();
}
}
-
- if (rand() % 1000 < 5) {
- size_t l0_cnt = get_active_version()->get_structure()->get_level_vector()[0]->get_shard_count();
- usleep(l0_cnt);
+ if (m_buffer->is_at_high_watermark() && current_l0 > max_l0) {
+ schedule_maint_reconstruction(true);
+ // fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0);
+ return 0;
}
/* this will fail if the HWM is reached and return 0 */