diff options
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 77 |
1 files changed, 59 insertions, 18 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 0858fc3..233bebb 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -198,8 +198,8 @@ public: */ void await_next_epoch() { while (m_current_epoch.load() != m_newest_epoch.load()) { - std::unique_lock<std::mutex> m_epoch_cv_lk; - m_epoch_cv.wait(m_epoch_cv_lk); + std::unique_lock<std::mutex> lk(m_epoch_cv_lk); + m_epoch_cv.wait(lk); } return; @@ -238,14 +238,26 @@ private: } _Epoch *get_active_epoch_protected() { - m_epochs[m_current_epoch.load()]->start_job(); - return m_epochs[m_current_epoch.load()]; + ssize_t cur_epoch = -1; + do { + if (cur_epoch != -1) { + m_epochs[cur_epoch]->end_job(); + } + + cur_epoch = m_current_epoch.load(); + m_epochs[cur_epoch]->start_job(); + } while (cur_epoch != m_current_epoch.load()); + + return m_epochs[cur_epoch]; } void advance_epoch() { size_t new_epoch_num = m_newest_epoch.load(); + size_t old_epoch_num = m_current_epoch.load(); + assert(new_epoch_num != old_epoch_num); + _Epoch *new_epoch = m_epochs[new_epoch_num]; - _Epoch *old_epoch = m_epochs[m_current_epoch.load()]; + _Epoch *old_epoch = m_epochs[old_epoch_num]; /* * Update the new Epoch to contain the buffers from the old one @@ -255,7 +267,11 @@ private: */ if constexpr (!std::same_as<SCHED, SerialScheduler>) { size_t old_buffer_cnt = new_epoch->clear_buffers(); - for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) { + // FIXME: this is getting nightmarish... The -1 here is to ensure that the + // the empty buffer added when the merge was first triggered is also included. + // Due to the reordering of operations in internal_append, the new buffer exists + // at the time of the clone, and so is already in the new epoch. + for (size_t i=old_buffer_cnt-1; i<old_epoch->get_buffers().size(); i++) { new_epoch->add_buffer(old_epoch->get_buffers()[i]); } } @@ -298,14 +314,22 @@ private: * buffer while a new epoch is being created in the background. Returns a * pointer to the newly created buffer. */ - Buffer *add_empty_buffer(_Epoch *epoch) { - auto new_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); + Buffer *add_empty_buffer(_Epoch *epoch, Buffer *current_buffer=nullptr) { + auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); std::unique_lock<std::mutex> m_struct_lock; + auto new_buffer = epoch->add_buffer(temp_buffer, current_buffer); + /* + * if epoch->add_buffer doesn't add the new buffer, this insert + * won't update the buffer set (duplicate insert) + */ m_buffers.insert(new_buffer); m_struct_lock.release(); - epoch->add_buffer(new_buffer); + if (new_buffer != temp_buffer) { + delete temp_buffer; + } + return new_buffer; } @@ -448,24 +472,41 @@ private: int internal_append(const R &rec, bool ts) { Buffer *buffer = nullptr; + int res = 0; do { - // FIXME: figure out best way to protect this epoch access - auto epoch = get_active_epoch(); + auto epoch = get_active_epoch_protected(); buffer = epoch->get_active_buffer(); + assert(buffer); - /* if the buffer is full, schedule a merge and add a new empty buffer */ + /* + * If the buffer is full and there is no current merge, + * schedule a merge and add a new empty buffer. If there + * is a current merge, then just add a new empty buffer + * to the current epoch. + */ if (buffer->is_full()) { - // FIXME: possible race here--two identical merges could be scheduled - auto vers = epoch->get_structure(); - schedule_merge(); - if constexpr (std::same_as<SCHED, SerialScheduler>) { + /* single threaded: run merge and then empty buffer */ + epoch->end_job(); + schedule_merge(); buffer->truncate(); - } else { + continue; + } else if (epoch->prepare_merge()) { + /* + * add an empty buffer to allow insert proceed and + * schedule a merge on a background thread + */ buffer = add_empty_buffer(epoch); + schedule_merge(); + } else { + /* background merge is ongoing, so just add empty buffer */ + buffer = add_empty_buffer(epoch, buffer); } } - } while(!buffer->append(rec, ts)); + + res = buffer->append(rec, ts); + epoch->end_job(); + } while(!res); /* internal append should always succeed, eventually */ return 1; |