summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h18
1 files changed, 13 insertions, 5 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 5c1eaab..49c6905 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -244,8 +244,11 @@ private:
void advance_epoch() {
size_t new_epoch_num = m_newest_epoch.load();
+ size_t old_epoch_num = m_current_epoch.load();
+ assert(new_epoch_num != old_epoch_num);
+
_Epoch *new_epoch = m_epochs[new_epoch_num];
- _Epoch *old_epoch = m_epochs[m_current_epoch.load()];
+ _Epoch *old_epoch = m_epochs[old_epoch_num];
/*
* Update the new Epoch to contain the buffers from the old one
@@ -302,10 +305,10 @@ private:
auto new_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
std::unique_lock<std::mutex> m_struct_lock;
+ epoch->add_buffer(new_buffer);
m_buffers.insert(new_buffer);
m_struct_lock.release();
- epoch->add_buffer(new_buffer);
return new_buffer;
}
@@ -448,24 +451,29 @@ private:
int internal_append(const R &rec, bool ts) {
Buffer *buffer = nullptr;
+ int res;
do {
// FIXME: figure out best way to protect this epoch access
- auto epoch = get_active_epoch();
+ auto epoch = get_active_epoch_protected();
buffer = epoch->get_active_buffer();
/* if the buffer is full, schedule a merge and add a new empty buffer */
if (buffer->is_full()) {
// FIXME: possible race here--two identical merges could be scheduled
auto vers = epoch->get_structure();
- schedule_merge();
if constexpr (std::same_as<SCHED, SerialScheduler>) {
buffer->truncate();
} else {
buffer = add_empty_buffer(epoch);
}
+
+ schedule_merge();
}
- } while(!buffer->append(rec, ts));
+
+ res = buffer->append(rec, ts);
+ epoch->end_job();
+ } while(!res);
/* internal append should always succeed, eventually */
return 1;