From 68ae6279476e7d37837ac06474fb558e50ce6706 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 12:41:55 -0400 Subject: Fixes for various bugs under SerialScheduler --- include/framework/DynamicExtension.h | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 3a4a7e1..5c1eaab 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -54,7 +54,7 @@ public: { auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop); - auto epoch = new _Epoch(vers, buf); + auto epoch = new _Epoch(0, vers, buf); m_buffers.insert(buf); m_versions.insert(vers); @@ -249,13 +249,19 @@ private: /* * Update the new Epoch to contain the buffers from the old one - * that it doesn't currently have + * that it doesn't currently have if using a multi-threaded + * scheduler (otherwise, there is only one buffer that is + * reused, so it shouldn't be removed) */ - size_t old_buffer_cnt = new_epoch->clear_buffers(); - for (size_t i=old_buffer_cnt; iget_buffers().size(); i++) { - new_epoch->add_buffer(old_epoch->get_buffers()[i]); + if constexpr (!std::same_as) { + size_t old_buffer_cnt = new_epoch->clear_buffers(); + for (size_t i=old_buffer_cnt; iget_buffers().size(); i++) { + new_epoch->add_buffer(old_epoch->get_buffers()[i]); + } } + m_current_epoch.fetch_add(1); + old_epoch->set_inactive(); /* notify any blocking threads that the new epoch is available */ m_epoch_cv_lk.lock(); @@ -276,10 +282,10 @@ private: * is violated, it is possible that this code will clone a retired * epoch. */ - auto new_epoch = get_active_epoch()->clone(); + m_newest_epoch.fetch_add(1); + auto new_epoch = get_active_epoch()->clone(m_newest_epoch.load()); std::unique_lock m_struct_lock; m_versions.insert(new_epoch->get_structure()); - m_newest_epoch.fetch_add(1); m_epochs.insert({m_newest_epoch.load(), new_epoch}); m_struct_lock.release(); @@ -316,6 +322,9 @@ private: while (!epoch->retirable()) ; + /* remove epoch from the framework's map */ + m_epochs.erase(epoch->get_epoch_number()); + /* * The epoch's destructor will handle releasing * all the references it holds @@ -440,7 +449,8 @@ private: int internal_append(const R &rec, bool ts) { Buffer *buffer = nullptr; do { - auto epoch = get_active_epoch_protected(); + // FIXME: figure out best way to protect this epoch access + auto epoch = get_active_epoch(); buffer = epoch->get_active_buffer(); /* if the buffer is full, schedule a merge and add a new empty buffer */ @@ -448,10 +458,13 @@ private: // FIXME: possible race here--two identical merges could be scheduled auto vers = epoch->get_structure(); schedule_merge(); - buffer = add_empty_buffer(epoch); + + if constexpr (std::same_as) { + buffer->truncate(); + } else { + buffer = add_empty_buffer(epoch); + } } - // FIXME: not exactly the best spot for this - epoch->end_job(); } while(!buffer->append(rec, ts)); /* internal append should always succeed, eventually */ -- cgit v1.2.3