summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-02-24 11:14:52 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-02-24 11:14:52 -0500
commitd116b94389538aa8e0e7354fae77693b980de4f0 (patch)
tree918e29a6734c726f398f2d06894fdb42d9752fa9 /include/framework/DynamicExtension.h
parent40fe2e7ea56d49a065a4a53b7f8a4a918a5d78b0 (diff)
downloaddynamic-extension-d116b94389538aa8e0e7354fae77693b980de4f0.tar.gz
Query Preemption: still has one or two bugs, but mostly works
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h49
1 files changed, 48 insertions, 1 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 4d4585f..8cef4a1 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -92,6 +92,7 @@ public:
std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0));
m_version_counter = INITIAL_VERSION;
+ m_preempt_version = INVALID_VERSION;
assert(m_config.recon_policy);
}
@@ -404,8 +405,28 @@ private:
LockManager m_lock_mngr;
+ std::atomic<size_t> m_preempt_version;
+
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
+ bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args, size_t version) {
+ if (version <= m_preempt_version.load()) {
+ // fprintf(stderr, "[I] Preempted query on version %ld, restarting...\n", version);
+ m_sched->schedule_job(async_query, 0, (void *)args, QUERY);
+ return true;
+ }
+
+ return false;
+ }
+
+ void preempt_queries() {
+ size_t vers = m_active_version.load()->get_id() - 1;
+ if (vers != m_preempt_version.load()){
+ m_preempt_version.store(vers);
+ // fprintf(stderr, "[I] Initiating query preemption on version %ld\n", vers);
+ }
+ }
+
void enforce_delete_invariant(VersionType *version) {
auto structure = version->get_structure();
auto compactions = structure->get_compaction_tasks();
@@ -471,7 +492,20 @@ private:
// fprintf(stderr, "[I] Buffer Head:\t%ld\n", new_head);
/* advance the buffer head for a flush */
- args->version->advance_buffer_head(new_head);
+ bool success = false;
+ size_t failure_cnt = 0;
+ while (!success) {
+ success = args->version->advance_buffer_head(new_head);
+ if (!success) {
+ failure_cnt++;
+ usleep(1);
+ // fprintf(stderr, "[I] Buffer head advance blocked on %ld\n", args->version->get_id());
+
+ if (failure_cnt >= extension->m_config.buffer_flush_query_preemption_trigger) {
+ extension->preempt_queries();
+ }
+ }
+ }
} else {
// fprintf(stderr, "[I] Running background reconstruction\n");
@@ -576,6 +610,19 @@ private:
if (query_results[i].size() > 0)
break;
}
+
+ /*
+ * for query preemption--check if the query should be restarted
+ * to prevent blocking buffer flushes for too long
+ */
+ if (args->extension->restart_query(args, version->get_id())) {
+ /* clean up memory allocated for temporary query objects */
+ delete buffer_query;
+ for (size_t i = 0; i < local_queries.size(); i++) {
+ delete local_queries[i];
+ }
+ return;
+ }
}
/*