diff options
| -rw-r--r-- | include/framework/DynamicExtension.h | 6 | ||||
| -rw-r--r-- | include/framework/scheduling/SerialScheduler.h | 1 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 1 |
3 files changed, 7 insertions, 1 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index fc7922c..26221d8 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -259,6 +259,7 @@ private: } Q::delete_buffer_query_state(buffer_state); + buffer->release_reference(); args->result_set.set_value(std::move(result)); delete args; @@ -276,10 +277,13 @@ private: } std::future<std::vector<R>> schedule_query(Structure *version, Buffer *buffer, void *query_parms) { + buffer->take_reference(); // FIXME: this is wrong. The buffer and version need to be + // taken atomically, together. + QueryArgs<R> *args = new QueryArgs<R>(); args->buffer = buffer; args->version = version; - args->buffer = query_parms; + args->query_parms = query_parms; m_sched.schedule_job(async_query, 0, args); diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index 5d6e5c2..da2bb8e 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -51,6 +51,7 @@ public: void schedule_job(std::function<void(void*)> job, size_t size, void *args) { size_t ts = m_counter.fetch_add(1); m_task_queue.push(Task(size, ts, job, args)); + m_cv.notify_all(); } void shutdown() { diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 804ca5e..4e0b5c2 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -195,6 +195,7 @@ public: bool finish_merge() { m_merge_lock.unlock(); + m_merging.store(false); return true; } |