summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h49
1 files changed, 44 insertions, 5 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index ea3ef4d..62aaf88 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -24,7 +24,7 @@
#include "framework/util/Configuration.h"
-#include "framework/scheduling/Epoch.h"
+#include "framework/scheduling/Version.h"
#include "util/types.h"
namespace de {
@@ -447,8 +447,6 @@ private:
args->result.set_value(true);
}
- ((DynamicExtension *)args->extension)
- ->m_scheduling_reconstruction.store(false);
delete args;
}
@@ -573,7 +571,21 @@ private:
return get_active_version()->get_structure()->copy();
}
+
+ void begin_reconstruction_scheduling() {
+ bool cur_val;
+ do {
+ cur_val = m_scheduling_reconstruction.load();
+ } while(cur_val == true && !m_scheduling_reconstruction.compare_exchange_strong(cur_val, true));
+ }
+
+ void end_reconstruction_scheduling() {
+ /* no need for any other sync here, this thread has an implicit lock */
+ m_scheduling_reconstruction.store(false);
+ }
+
void schedule_flush() {
+ begin_reconstruction_scheduling();
auto new_version = create_version();
auto *args = new ReconstructionArgs<ShardType, QueryType>();
@@ -589,12 +601,39 @@ private:
m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION);
if (m_config.recon_enable_maint_on_flush) {
- schedule_maint_reconstruction();
+ schedule_maint_reconstruction(false);
}
+
+ end_reconstruction_scheduling();
}
- void schedule_maint_reconstruction() {
+ void schedule_maint_reconstruction(bool take_reconstruction_lock=true) {
+
+ if (take_reconstruction_lock) {
+ begin_reconstruction_scheduling();
+ }
+
+ // FIXME: memory management issue here?
+ auto new_version = create_version(m_active_version.load()->get_structure());
+
+ auto *args = new ReconstructionArgs<ShardType, QueryType>();
+ args->version.load(new_version);
+ args->tasks = m_recon_policy->get_reconstruction_tasks(args->version.get());
+ args->extension = this;
+ args->priority = ReconstructionPriority::MAINT;
+
+ /*
+ * NOTE: args is deleted by the reconstruction job, so shouldn't be
+ * freed here
+ */
+ m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION);
+
+
+ if (take_reconstruction_lock) {
+ end_reconstruction_scheduling();
+ }
+
return;
}