From d116b94389538aa8e0e7354fae77693b980de4f0 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 24 Feb 2025 11:14:52 -0500 Subject: Query Preemption: still has one or two bugs, but mostly works --- include/framework/DynamicExtension.h | 49 ++++++++++++++++++++++++++++- include/framework/scheduling/Version.h | 7 +---- include/framework/structure/MutableBuffer.h | 6 ---- include/framework/util/Configuration.h | 2 ++ 4 files changed, 51 insertions(+), 13 deletions(-) (limited to 'include/framework') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 4d4585f..8cef4a1 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -92,6 +92,7 @@ public: std::make_shared(INITIAL_VERSION, std::make_unique(), m_buffer.get(), 0)); m_version_counter = INITIAL_VERSION; + m_preempt_version = INVALID_VERSION; assert(m_config.recon_policy); } @@ -404,8 +405,28 @@ private: LockManager m_lock_mngr; + std::atomic m_preempt_version; + alignas(64) std::atomic m_scheduling_reconstruction; + bool restart_query(QueryArgs *args, size_t version) { + if (version <= m_preempt_version.load()) { + // fprintf(stderr, "[I] Preempted query on version %ld, restarting...\n", version); + m_sched->schedule_job(async_query, 0, (void *)args, QUERY); + return true; + } + + return false; + } + + void preempt_queries() { + size_t vers = m_active_version.load()->get_id() - 1; + if (vers != m_preempt_version.load()){ + m_preempt_version.store(vers); + // fprintf(stderr, "[I] Initiating query preemption on version %ld\n", vers); + } + } + void enforce_delete_invariant(VersionType *version) { auto structure = version->get_structure(); auto compactions = structure->get_compaction_tasks(); @@ -471,7 +492,20 @@ private: // fprintf(stderr, "[I] Buffer Head:\t%ld\n", new_head); /* advance the buffer head for a flush */ - args->version->advance_buffer_head(new_head); + bool success = false; + size_t failure_cnt = 0; + while (!success) { + success = args->version->advance_buffer_head(new_head); + if (!success) { + failure_cnt++; + usleep(1); + // fprintf(stderr, "[I] Buffer head advance blocked on %ld\n", args->version->get_id()); + + if (failure_cnt >= extension->m_config.buffer_flush_query_preemption_trigger) { + extension->preempt_queries(); + } + } + } } else { // fprintf(stderr, "[I] Running background reconstruction\n"); @@ -576,6 +610,19 @@ private: if (query_results[i].size() > 0) break; } + + /* + * for query preemption--check if the query should be restarted + * to prevent blocking buffer flushes for too long + */ + if (args->extension->restart_query(args, version->get_id())) { + /* clean up memory allocated for temporary query objects */ + delete buffer_query; + for (size_t i = 0; i < local_queries.size(); i++) { + delete local_queries[i]; + } + return; + } } /* diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index e2acc8f..4cd73ba 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -82,12 +82,7 @@ public: bool advance_buffer_head(size_t new_head) { m_buffer_head = new_head; - - while (!m_buffer->advance_head(new_head)) { - usleep(1); - } - - return true; + return m_buffer->advance_head(new_head); } void update_shard_version(size_t version) { diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 105f0f3..e62a495 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -147,12 +147,6 @@ public: assert(new_head > m_head.load().head_idx); assert(new_head <= m_tail.load()); - /* refuse to advance head while there is an old with one references */ - if (m_old_head.load().refcnt > 0) { - //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n"); - return false; - } - m_active_head_advance.store(true); if (m_old_head.load().refcnt > 0) { diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 81698d2..fecb2bf 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -44,6 +44,8 @@ class DEConfiguration { size_t minimum_query_threads = 4; size_t maximum_memory_usage = 0; /* o for unbounded */ + size_t buffer_flush_query_preemption_trigger = 10; + }; } // namespace de -- cgit v1.2.3