summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-11-06 12:39:35 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2023-11-06 12:39:35 -0500
commit254f8aa85ea8962e5c11d8b475a171883c22f168 (patch)
treeee19378cf463c54d04fce39d1b3e5975f0790299 /include/framework/DynamicExtension.h
parentad117358b8ab9924d216edeca0eafa87b4f86896 (diff)
downloaddynamic-extension-254f8aa85ea8962e5c11d8b475a171883c22f168.tar.gz
DynamicExtension: internal_append fixes
Fixed a few bugs with concurrent operation of internal_append, as well as enabled the spawning of multiple empty buffers while merges are currently active.
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h36
1 files changed, 26 insertions, 10 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 8ce6a7a..60aa07e 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -258,7 +258,11 @@ private:
*/
if constexpr (!std::same_as<SCHED, SerialScheduler>) {
size_t old_buffer_cnt = new_epoch->clear_buffers();
- for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) {
+ // FIXME: this is getting nightmarish... The -1 here is to ensure that the
+ // the empty buffer added when the merge was first triggered is also included.
+ // Due to the reordering of operations in internal_append, the new buffer exists
+ // at the time of the clone, and so is already in the new epoch.
+ for (size_t i=old_buffer_cnt-1; i<old_epoch->get_buffers().size(); i++) {
new_epoch->add_buffer(old_epoch->get_buffers()[i]);
}
}
@@ -459,24 +463,36 @@ private:
int internal_append(const R &rec, bool ts) {
Buffer *buffer = nullptr;
- int res;
+ int res = 0;
do {
- // FIXME: figure out best way to protect this epoch access
auto epoch = get_active_epoch_protected();
buffer = epoch->get_active_buffer();
+ assert(buffer);
- /* if the buffer is full, schedule a merge and add a new empty buffer */
+ /*
+ * If the buffer is full and there is no current merge,
+ * schedule a merge and add a new empty buffer. If there
+ * is a current merge, then just add a new empty buffer
+ * to the current epoch.
+ */
if (buffer->is_full()) {
- // FIXME: possible race here--two identical merges could be scheduled
- auto vers = epoch->get_structure();
-
if constexpr (std::same_as<SCHED, SerialScheduler>) {
+ /* single threaded: run merge and then empty buffer */
+ epoch->end_job();
+ schedule_merge();
buffer->truncate();
- } else {
+ continue;
+ } else if (epoch->prepare_merge()) {
+ /*
+ * add an empty buffer to allow insert proceed and
+ * schedule a merge on a background thread
+ */
buffer = add_empty_buffer(epoch);
+ schedule_merge();
+ } else {
+ /* background merge is ongoing, so just add empty buffer */
+ buffer = add_empty_buffer(epoch, buffer);
}
-
- schedule_merge();
}
res = buffer->append(rec, ts);