diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-02-24 11:14:52 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-02-24 11:14:52 -0500 |
| commit | d116b94389538aa8e0e7354fae77693b980de4f0 (patch) | |
| tree | 918e29a6734c726f398f2d06894fdb42d9752fa9 /include/framework/DynamicExtension.h | |
| parent | 40fe2e7ea56d49a065a4a53b7f8a4a918a5d78b0 (diff) | |
| download | dynamic-extension-d116b94389538aa8e0e7354fae77693b980de4f0.tar.gz | |
Query Preemption: still has one or two bugs, but mostly works
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 49 |
1 files changed, 48 insertions, 1 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 4d4585f..8cef4a1 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -92,6 +92,7 @@ public: std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0)); m_version_counter = INITIAL_VERSION; + m_preempt_version = INVALID_VERSION; assert(m_config.recon_policy); } @@ -404,8 +405,28 @@ private: LockManager m_lock_mngr; + std::atomic<size_t> m_preempt_version; + alignas(64) std::atomic<bool> m_scheduling_reconstruction; + bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args, size_t version) { + if (version <= m_preempt_version.load()) { + // fprintf(stderr, "[I] Preempted query on version %ld, restarting...\n", version); + m_sched->schedule_job(async_query, 0, (void *)args, QUERY); + return true; + } + + return false; + } + + void preempt_queries() { + size_t vers = m_active_version.load()->get_id() - 1; + if (vers != m_preempt_version.load()){ + m_preempt_version.store(vers); + // fprintf(stderr, "[I] Initiating query preemption on version %ld\n", vers); + } + } + void enforce_delete_invariant(VersionType *version) { auto structure = version->get_structure(); auto compactions = structure->get_compaction_tasks(); @@ -471,7 +492,20 @@ private: // fprintf(stderr, "[I] Buffer Head:\t%ld\n", new_head); /* advance the buffer head for a flush */ - args->version->advance_buffer_head(new_head); + bool success = false; + size_t failure_cnt = 0; + while (!success) { + success = args->version->advance_buffer_head(new_head); + if (!success) { + failure_cnt++; + usleep(1); + // fprintf(stderr, "[I] Buffer head advance blocked on %ld\n", args->version->get_id()); + + if (failure_cnt >= extension->m_config.buffer_flush_query_preemption_trigger) { + extension->preempt_queries(); + } + } + } } else { // fprintf(stderr, "[I] Running background reconstruction\n"); @@ -576,6 +610,19 @@ private: if (query_results[i].size() > 0) break; } + + /* + * for query preemption--check if the query should be restarted + * to prevent blocking buffer flushes for too long + */ + if (args->extension->restart_query(args, version->get_id())) { + /* clean up memory allocated for temporary query objects */ + delete buffer_query; + for (size_t i = 0; i < local_queries.size(); i++) { + delete local_queries[i]; + } + return; + } } /* |