summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h49
-rw-r--r--include/framework/scheduling/Version.h7
-rw-r--r--include/framework/structure/MutableBuffer.h6
-rw-r--r--include/framework/util/Configuration.h2
4 files changed, 51 insertions, 13 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 4d4585f..8cef4a1 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -92,6 +92,7 @@ public:
std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0));
m_version_counter = INITIAL_VERSION;
+ m_preempt_version = INVALID_VERSION;
assert(m_config.recon_policy);
}
@@ -404,8 +405,28 @@ private:
LockManager m_lock_mngr;
+ std::atomic<size_t> m_preempt_version;
+
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
+ bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args, size_t version) {
+ if (version <= m_preempt_version.load()) {
+ // fprintf(stderr, "[I] Preempted query on version %ld, restarting...\n", version);
+ m_sched->schedule_job(async_query, 0, (void *)args, QUERY);
+ return true;
+ }
+
+ return false;
+ }
+
+ void preempt_queries() {
+ size_t vers = m_active_version.load()->get_id() - 1;
+ if (vers != m_preempt_version.load()){
+ m_preempt_version.store(vers);
+ // fprintf(stderr, "[I] Initiating query preemption on version %ld\n", vers);
+ }
+ }
+
void enforce_delete_invariant(VersionType *version) {
auto structure = version->get_structure();
auto compactions = structure->get_compaction_tasks();
@@ -471,7 +492,20 @@ private:
// fprintf(stderr, "[I] Buffer Head:\t%ld\n", new_head);
/* advance the buffer head for a flush */
- args->version->advance_buffer_head(new_head);
+ bool success = false;
+ size_t failure_cnt = 0;
+ while (!success) {
+ success = args->version->advance_buffer_head(new_head);
+ if (!success) {
+ failure_cnt++;
+ usleep(1);
+ // fprintf(stderr, "[I] Buffer head advance blocked on %ld\n", args->version->get_id());
+
+ if (failure_cnt >= extension->m_config.buffer_flush_query_preemption_trigger) {
+ extension->preempt_queries();
+ }
+ }
+ }
} else {
// fprintf(stderr, "[I] Running background reconstruction\n");
@@ -576,6 +610,19 @@ private:
if (query_results[i].size() > 0)
break;
}
+
+ /*
+ * for query preemption--check if the query should be restarted
+ * to prevent blocking buffer flushes for too long
+ */
+ if (args->extension->restart_query(args, version->get_id())) {
+ /* clean up memory allocated for temporary query objects */
+ delete buffer_query;
+ for (size_t i = 0; i < local_queries.size(); i++) {
+ delete local_queries[i];
+ }
+ return;
+ }
}
/*
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h
index e2acc8f..4cd73ba 100644
--- a/include/framework/scheduling/Version.h
+++ b/include/framework/scheduling/Version.h
@@ -82,12 +82,7 @@ public:
bool advance_buffer_head(size_t new_head) {
m_buffer_head = new_head;
-
- while (!m_buffer->advance_head(new_head)) {
- usleep(1);
- }
-
- return true;
+ return m_buffer->advance_head(new_head);
}
void update_shard_version(size_t version) {
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 105f0f3..e62a495 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -147,12 +147,6 @@ public:
assert(new_head > m_head.load().head_idx);
assert(new_head <= m_tail.load());
- /* refuse to advance head while there is an old with one references */
- if (m_old_head.load().refcnt > 0) {
- //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n");
- return false;
- }
-
m_active_head_advance.store(true);
if (m_old_head.load().refcnt > 0) {
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index 81698d2..fecb2bf 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -44,6 +44,8 @@ class DEConfiguration {
size_t minimum_query_threads = 4;
size_t maximum_memory_usage = 0; /* o for unbounded */
+ size_t buffer_flush_query_preemption_trigger = 10;
+
};
} // namespace de