summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h77
1 files changed, 59 insertions, 18 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 0858fc3..233bebb 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -198,8 +198,8 @@ public:
*/
void await_next_epoch() {
while (m_current_epoch.load() != m_newest_epoch.load()) {
- std::unique_lock<std::mutex> m_epoch_cv_lk;
- m_epoch_cv.wait(m_epoch_cv_lk);
+ std::unique_lock<std::mutex> lk(m_epoch_cv_lk);
+ m_epoch_cv.wait(lk);
}
return;
@@ -238,14 +238,26 @@ private:
}
_Epoch *get_active_epoch_protected() {
- m_epochs[m_current_epoch.load()]->start_job();
- return m_epochs[m_current_epoch.load()];
+ ssize_t cur_epoch = -1;
+ do {
+ if (cur_epoch != -1) {
+ m_epochs[cur_epoch]->end_job();
+ }
+
+ cur_epoch = m_current_epoch.load();
+ m_epochs[cur_epoch]->start_job();
+ } while (cur_epoch != m_current_epoch.load());
+
+ return m_epochs[cur_epoch];
}
void advance_epoch() {
size_t new_epoch_num = m_newest_epoch.load();
+ size_t old_epoch_num = m_current_epoch.load();
+ assert(new_epoch_num != old_epoch_num);
+
_Epoch *new_epoch = m_epochs[new_epoch_num];
- _Epoch *old_epoch = m_epochs[m_current_epoch.load()];
+ _Epoch *old_epoch = m_epochs[old_epoch_num];
/*
* Update the new Epoch to contain the buffers from the old one
@@ -255,7 +267,11 @@ private:
*/
if constexpr (!std::same_as<SCHED, SerialScheduler>) {
size_t old_buffer_cnt = new_epoch->clear_buffers();
- for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) {
+ // FIXME: this is getting nightmarish... The -1 here is to ensure that the
+ // the empty buffer added when the merge was first triggered is also included.
+ // Due to the reordering of operations in internal_append, the new buffer exists
+ // at the time of the clone, and so is already in the new epoch.
+ for (size_t i=old_buffer_cnt-1; i<old_epoch->get_buffers().size(); i++) {
new_epoch->add_buffer(old_epoch->get_buffers()[i]);
}
}
@@ -298,14 +314,22 @@ private:
* buffer while a new epoch is being created in the background. Returns a
* pointer to the newly created buffer.
*/
- Buffer *add_empty_buffer(_Epoch *epoch) {
- auto new_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
+ Buffer *add_empty_buffer(_Epoch *epoch, Buffer *current_buffer=nullptr) {
+ auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
std::unique_lock<std::mutex> m_struct_lock;
+ auto new_buffer = epoch->add_buffer(temp_buffer, current_buffer);
+ /*
+ * if epoch->add_buffer doesn't add the new buffer, this insert
+ * won't update the buffer set (duplicate insert)
+ */
m_buffers.insert(new_buffer);
m_struct_lock.release();
- epoch->add_buffer(new_buffer);
+ if (new_buffer != temp_buffer) {
+ delete temp_buffer;
+ }
+
return new_buffer;
}
@@ -448,24 +472,41 @@ private:
int internal_append(const R &rec, bool ts) {
Buffer *buffer = nullptr;
+ int res = 0;
do {
- // FIXME: figure out best way to protect this epoch access
- auto epoch = get_active_epoch();
+ auto epoch = get_active_epoch_protected();
buffer = epoch->get_active_buffer();
+ assert(buffer);
- /* if the buffer is full, schedule a merge and add a new empty buffer */
+ /*
+ * If the buffer is full and there is no current merge,
+ * schedule a merge and add a new empty buffer. If there
+ * is a current merge, then just add a new empty buffer
+ * to the current epoch.
+ */
if (buffer->is_full()) {
- // FIXME: possible race here--two identical merges could be scheduled
- auto vers = epoch->get_structure();
- schedule_merge();
-
if constexpr (std::same_as<SCHED, SerialScheduler>) {
+ /* single threaded: run merge and then empty buffer */
+ epoch->end_job();
+ schedule_merge();
buffer->truncate();
- } else {
+ continue;
+ } else if (epoch->prepare_merge()) {
+ /*
+ * add an empty buffer to allow insert proceed and
+ * schedule a merge on a background thread
+ */
buffer = add_empty_buffer(epoch);
+ schedule_merge();
+ } else {
+ /* background merge is ongoing, so just add empty buffer */
+ buffer = add_empty_buffer(epoch, buffer);
}
}
- } while(!buffer->append(rec, ts));
+
+ res = buffer->append(rec, ts);
+ epoch->end_job();
+ } while(!res);
/* internal append should always succeed, eventually */
return 1;