diff options
| -rw-r--r-- | include/framework/DynamicExtension.h | 18 |
1 files changed, 13 insertions, 5 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 5c1eaab..49c6905 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -244,8 +244,11 @@ private: void advance_epoch() { size_t new_epoch_num = m_newest_epoch.load(); + size_t old_epoch_num = m_current_epoch.load(); + assert(new_epoch_num != old_epoch_num); + _Epoch *new_epoch = m_epochs[new_epoch_num]; - _Epoch *old_epoch = m_epochs[m_current_epoch.load()]; + _Epoch *old_epoch = m_epochs[old_epoch_num]; /* * Update the new Epoch to contain the buffers from the old one @@ -302,10 +305,10 @@ private: auto new_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); std::unique_lock<std::mutex> m_struct_lock; + epoch->add_buffer(new_buffer); m_buffers.insert(new_buffer); m_struct_lock.release(); - epoch->add_buffer(new_buffer); return new_buffer; } @@ -448,24 +451,29 @@ private: int internal_append(const R &rec, bool ts) { Buffer *buffer = nullptr; + int res; do { // FIXME: figure out best way to protect this epoch access - auto epoch = get_active_epoch(); + auto epoch = get_active_epoch_protected(); buffer = epoch->get_active_buffer(); /* if the buffer is full, schedule a merge and add a new empty buffer */ if (buffer->is_full()) { // FIXME: possible race here--two identical merges could be scheduled auto vers = epoch->get_structure(); - schedule_merge(); if constexpr (std::same_as<SCHED, SerialScheduler>) { buffer->truncate(); } else { buffer = add_empty_buffer(epoch); } + + schedule_merge(); } - } while(!buffer->append(rec, ts)); + + res = buffer->append(rec, ts); + epoch->end_job(); + } while(!res); /* internal append should always succeed, eventually */ return 1; |