From 68ae6279476e7d37837ac06474fb558e50ce6706 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 31 Oct 2023 12:41:55 -0400 Subject: Fixes for various bugs under SerialScheduler --- include/framework/DynamicExtension.h | 35 ++++++++++++++++-------- include/framework/scheduling/Epoch.h | 31 +++++++++++++++++---- include/framework/structure/ExtensionStructure.h | 5 +++- 3 files changed, 54 insertions(+), 17 deletions(-) diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 3a4a7e1..5c1eaab 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -54,7 +54,7 @@ public: { auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop); - auto epoch = new _Epoch(vers, buf); + auto epoch = new _Epoch(0, vers, buf); m_buffers.insert(buf); m_versions.insert(vers); @@ -249,13 +249,19 @@ private: /* * Update the new Epoch to contain the buffers from the old one - * that it doesn't currently have + * that it doesn't currently have if using a multi-threaded + * scheduler (otherwise, there is only one buffer that is + * reused, so it shouldn't be removed) */ - size_t old_buffer_cnt = new_epoch->clear_buffers(); - for (size_t i=old_buffer_cnt; iget_buffers().size(); i++) { - new_epoch->add_buffer(old_epoch->get_buffers()[i]); + if constexpr (!std::same_as) { + size_t old_buffer_cnt = new_epoch->clear_buffers(); + for (size_t i=old_buffer_cnt; iget_buffers().size(); i++) { + new_epoch->add_buffer(old_epoch->get_buffers()[i]); + } } + m_current_epoch.fetch_add(1); + old_epoch->set_inactive(); /* notify any blocking threads that the new epoch is available */ m_epoch_cv_lk.lock(); @@ -276,10 +282,10 @@ private: * is violated, it is possible that this code will clone a retired * epoch. */ - auto new_epoch = get_active_epoch()->clone(); + m_newest_epoch.fetch_add(1); + auto new_epoch = get_active_epoch()->clone(m_newest_epoch.load()); std::unique_lock m_struct_lock; m_versions.insert(new_epoch->get_structure()); - m_newest_epoch.fetch_add(1); m_epochs.insert({m_newest_epoch.load(), new_epoch}); m_struct_lock.release(); @@ -316,6 +322,9 @@ private: while (!epoch->retirable()) ; + /* remove epoch from the framework's map */ + m_epochs.erase(epoch->get_epoch_number()); + /* * The epoch's destructor will handle releasing * all the references it holds @@ -440,7 +449,8 @@ private: int internal_append(const R &rec, bool ts) { Buffer *buffer = nullptr; do { - auto epoch = get_active_epoch_protected(); + // FIXME: figure out best way to protect this epoch access + auto epoch = get_active_epoch(); buffer = epoch->get_active_buffer(); /* if the buffer is full, schedule a merge and add a new empty buffer */ @@ -448,10 +458,13 @@ private: // FIXME: possible race here--two identical merges could be scheduled auto vers = epoch->get_structure(); schedule_merge(); - buffer = add_empty_buffer(epoch); + + if constexpr (std::same_as) { + buffer->truncate(); + } else { + buffer = add_empty_buffer(epoch); + } } - // FIXME: not exactly the best spot for this - epoch->end_job(); } while(!buffer->append(rec, ts)); /* internal append should always succeed, eventually */ diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 6bbf927..f4aefe9 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -22,16 +22,20 @@ private: typedef ExtensionStructure Structure; typedef BufferView BufView; public: - Epoch() + Epoch(size_t number=0) : m_buffers() , m_structure(nullptr) , m_active_jobs(0) + , m_active(true) + , m_epoch_number(number) {} - Epoch(Structure *structure, Buffer *buff) + Epoch(size_t number, Structure *structure, Buffer *buff) : m_buffers() , m_structure(structure) , m_active_jobs(0) + , m_active(true) + , m_epoch_number(number) { structure->take_reference(); buff->take_reference(); @@ -62,6 +66,7 @@ public: } void end_job() { + assert(m_active_jobs.load() > 0); m_active_jobs.fetch_add(-1); if (m_active_jobs.load() == 0) { @@ -74,6 +79,10 @@ public: return m_active_jobs.load(); } + size_t get_epoch_number() { + return m_epoch_number; + } + Structure *get_structure() { return m_structure; } @@ -109,18 +118,29 @@ public: /* * Returns a new Epoch object that is a copy of this one. The new object will also contain - * a copy of the m_structure, rather than a reference to the same one. + * a copy of the m_structure, rather than a reference to the same one. The epoch number of + * the new epoch will be set to the provided argument. */ - Epoch *clone() { - auto epoch = new Epoch(); + Epoch *clone(size_t number) { + auto epoch = new Epoch(number); epoch->m_buffers = m_buffers; if (m_structure) { epoch->m_structure = m_structure->copy(); + /* the copy routine returns a structure with 0 references */ + epoch->m_structure->take_reference(); + } + + for (auto b : m_buffers) { + b->take_reference(); } return epoch; } + void set_inactive() { + m_active = false; + } + /* * */ @@ -158,5 +178,6 @@ private: */ std::atomic m_active_jobs; bool m_active; + size_t m_epoch_number; }; } diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index f5657af..80ec7b9 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -46,7 +46,8 @@ public: * Create a shallow copy of this extension structure. The copy will share references to the * same levels/shards as the original, but will have its own lists. As all of the shards are * immutable (with the exception of deletes), the copy can be restructured with merges, etc., - * without affecting the original. + * without affecting the original. The copied structure will be returned with a reference + * count of 0; generally you will want to immediately call take_reference() on it. * * NOTE: When using tagged deletes, a delete of a record in the original structure will affect * the copy, so long as the copy retains a reference to the same shard as the original. This could @@ -59,6 +60,8 @@ public: new_struct->m_levels.push_back(m_levels[i]->clone()); } + new_struct->m_refcnt = 0; + return new_struct; } -- cgit v1.2.3