summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h35
1 files changed, 24 insertions, 11 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 3a4a7e1..5c1eaab 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -54,7 +54,7 @@ public:
{
auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop);
- auto epoch = new _Epoch(vers, buf);
+ auto epoch = new _Epoch(0, vers, buf);
m_buffers.insert(buf);
m_versions.insert(vers);
@@ -249,13 +249,19 @@ private:
/*
* Update the new Epoch to contain the buffers from the old one
- * that it doesn't currently have
+ * that it doesn't currently have if using a multi-threaded
+ * scheduler (otherwise, there is only one buffer that is
+ * reused, so it shouldn't be removed)
*/
- size_t old_buffer_cnt = new_epoch->clear_buffers();
- for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) {
- new_epoch->add_buffer(old_epoch->get_buffers()[i]);
+ if constexpr (!std::same_as<SCHED, SerialScheduler>) {
+ size_t old_buffer_cnt = new_epoch->clear_buffers();
+ for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) {
+ new_epoch->add_buffer(old_epoch->get_buffers()[i]);
+ }
}
+
m_current_epoch.fetch_add(1);
+ old_epoch->set_inactive();
/* notify any blocking threads that the new epoch is available */
m_epoch_cv_lk.lock();
@@ -276,10 +282,10 @@ private:
* is violated, it is possible that this code will clone a retired
* epoch.
*/
- auto new_epoch = get_active_epoch()->clone();
+ m_newest_epoch.fetch_add(1);
+ auto new_epoch = get_active_epoch()->clone(m_newest_epoch.load());
std::unique_lock<std::mutex> m_struct_lock;
m_versions.insert(new_epoch->get_structure());
- m_newest_epoch.fetch_add(1);
m_epochs.insert({m_newest_epoch.load(), new_epoch});
m_struct_lock.release();
@@ -316,6 +322,9 @@ private:
while (!epoch->retirable())
;
+ /* remove epoch from the framework's map */
+ m_epochs.erase(epoch->get_epoch_number());
+
/*
* The epoch's destructor will handle releasing
* all the references it holds
@@ -440,7 +449,8 @@ private:
int internal_append(const R &rec, bool ts) {
Buffer *buffer = nullptr;
do {
- auto epoch = get_active_epoch_protected();
+ // FIXME: figure out best way to protect this epoch access
+ auto epoch = get_active_epoch();
buffer = epoch->get_active_buffer();
/* if the buffer is full, schedule a merge and add a new empty buffer */
@@ -448,10 +458,13 @@ private:
// FIXME: possible race here--two identical merges could be scheduled
auto vers = epoch->get_structure();
schedule_merge();
- buffer = add_empty_buffer(epoch);
+
+ if constexpr (std::same_as<SCHED, SerialScheduler>) {
+ buffer->truncate();
+ } else {
+ buffer = add_empty_buffer(epoch);
+ }
}
- // FIXME: not exactly the best spot for this
- epoch->end_job();
} while(!buffer->append(rec, ts));
/* internal append should always succeed, eventually */