From 254f8aa85ea8962e5c11d8b475a171883c22f168 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 Nov 2023 12:39:35 -0500 Subject: DynamicExtension: internal_append fixes Fixed a few bugs with concurrent operation of internal_append, as well as enabled the spawning of multiple empty buffers while merges are currently active. --- include/framework/DynamicExtension.h | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 8ce6a7a..60aa07e 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -258,7 +258,11 @@ private: */ if constexpr (!std::same_as) { size_t old_buffer_cnt = new_epoch->clear_buffers(); - for (size_t i=old_buffer_cnt; iget_buffers().size(); i++) { + // FIXME: this is getting nightmarish... The -1 here is to ensure that the + // the empty buffer added when the merge was first triggered is also included. + // Due to the reordering of operations in internal_append, the new buffer exists + // at the time of the clone, and so is already in the new epoch. + for (size_t i=old_buffer_cnt-1; iget_buffers().size(); i++) { new_epoch->add_buffer(old_epoch->get_buffers()[i]); } } @@ -459,24 +463,36 @@ private: int internal_append(const R &rec, bool ts) { Buffer *buffer = nullptr; - int res; + int res = 0; do { - // FIXME: figure out best way to protect this epoch access auto epoch = get_active_epoch_protected(); buffer = epoch->get_active_buffer(); + assert(buffer); - /* if the buffer is full, schedule a merge and add a new empty buffer */ + /* + * If the buffer is full and there is no current merge, + * schedule a merge and add a new empty buffer. If there + * is a current merge, then just add a new empty buffer + * to the current epoch. + */ if (buffer->is_full()) { - // FIXME: possible race here--two identical merges could be scheduled - auto vers = epoch->get_structure(); - if constexpr (std::same_as) { + /* single threaded: run merge and then empty buffer */ + epoch->end_job(); + schedule_merge(); buffer->truncate(); - } else { + continue; + } else if (epoch->prepare_merge()) { + /* + * add an empty buffer to allow insert proceed and + * schedule a merge on a background thread + */ buffer = add_empty_buffer(epoch); + schedule_merge(); + } else { + /* background merge is ongoing, so just add empty buffer */ + buffer = add_empty_buffer(epoch, buffer); } - - schedule_merge(); } res = buffer->append(rec, ts); -- cgit v1.2.3