From 254f8aa85ea8962e5c11d8b475a171883c22f168 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 12:39:35 -0500 Subject: DynamicExtension: internal_append fixes Fixed a few bugs with concurrent operation of internal_append, as well as enabled the spawning of multiple empty buffers while merges are currently active. --- include/framework/DynamicExtension.h | 36 ++++++++++++++++++++++++++---------- include/framework/scheduling/Epoch.h | 29 +++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 10 deletions(-) (limited to 'include/framework') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 8ce6a7a..60aa07e 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -258,7 +258,11 @@ private: */ if constexpr (!std::same_as) { size_t old_buffer_cnt = new_epoch->clear_buffers(); - for (size_t i=old_buffer_cnt; iget_buffers().size(); i++) { + // FIXME: this is getting nightmarish... The -1 here is to ensure that the + // the empty buffer added when the merge was first triggered is also included. + // Due to the reordering of operations in internal_append, the new buffer exists + // at the time of the clone, and so is already in the new epoch. + for (size_t i=old_buffer_cnt-1; iget_buffers().size(); i++) { new_epoch->add_buffer(old_epoch->get_buffers()[i]); } } @@ -459,24 +463,36 @@ private: int internal_append(const R &rec, bool ts) { Buffer *buffer = nullptr; - int res; + int res = 0; do { - // FIXME: figure out best way to protect this epoch access auto epoch = get_active_epoch_protected(); buffer = epoch->get_active_buffer(); + assert(buffer); - /* if the buffer is full, schedule a merge and add a new empty buffer */ + /* + * If the buffer is full and there is no current merge, + * schedule a merge and add a new empty buffer. If there + * is a current merge, then just add a new empty buffer + * to the current epoch. + */ if (buffer->is_full()) { - // FIXME: possible race here--two identical merges could be scheduled - auto vers = epoch->get_structure(); - if constexpr (std::same_as) { + /* single threaded: run merge and then empty buffer */ + epoch->end_job(); + schedule_merge(); buffer->truncate(); - } else { + continue; + } else if (epoch->prepare_merge()) { + /* + * add an empty buffer to allow insert proceed and + * schedule a merge on a background thread + */ buffer = add_empty_buffer(epoch); + schedule_merge(); + } else { + /* background merge is ongoing, so just add empty buffer */ + buffer = add_empty_buffer(epoch, buffer); } - - schedule_merge(); } res = buffer->append(rec, ts); diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 58fe6cd..0ebbde9 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -25,6 +25,7 @@ public: Epoch(size_t number=0) : m_buffers() , m_structure(nullptr) + , m_active_merge(false) , m_active_jobs(0) , m_active(true) , m_epoch_number(number) @@ -34,6 +35,7 @@ public: : m_buffers() , m_structure(structure) , m_active_jobs(0) + , m_active_merge(false) , m_active(true) , m_epoch_number(number) { @@ -151,6 +153,31 @@ public: return epoch; } + /* + * Check if a merge can be started from this Epoch. + * At present, without concurrent merging, this simply + * checks if there is currently a scheduled merge based + * on this Epoch. If there is, returns false. If there + * isn't, return true and set a flag indicating that + * there is an active merge. + */ + bool prepare_merge() { + auto old = m_active_merge.load(); + if (old) { + return false; + } + + // FIXME: this needs cleaned up + while (!m_active_merge.compare_exchange_strong(old, true)) { + old = m_active_merge.load(); + if (old) { + return false; + } + } + + return true; + } + void set_inactive() { m_active = false; } @@ -184,6 +211,8 @@ private: std::condition_variable m_active_cv; std::mutex m_cv_lock; + std::atomic m_active_merge; + /* * The number of currently active jobs * (queries/merges) operating on this -- cgit v1.2.3