summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/framework/DynamicExtension.h36
-rw-r--r--include/framework/scheduling/Epoch.h29
2 files changed, 55 insertions, 10 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 8ce6a7a..60aa07e 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -258,7 +258,11 @@ private:
*/
if constexpr (!std::same_as<SCHED, SerialScheduler>) {
size_t old_buffer_cnt = new_epoch->clear_buffers();
- for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) {
+ // FIXME: this is getting nightmarish... The -1 here is to ensure that the
+ // the empty buffer added when the merge was first triggered is also included.
+ // Due to the reordering of operations in internal_append, the new buffer exists
+ // at the time of the clone, and so is already in the new epoch.
+ for (size_t i=old_buffer_cnt-1; i<old_epoch->get_buffers().size(); i++) {
new_epoch->add_buffer(old_epoch->get_buffers()[i]);
}
}
@@ -459,24 +463,36 @@ private:
int internal_append(const R &rec, bool ts) {
Buffer *buffer = nullptr;
- int res;
+ int res = 0;
do {
- // FIXME: figure out best way to protect this epoch access
auto epoch = get_active_epoch_protected();
buffer = epoch->get_active_buffer();
+ assert(buffer);
- /* if the buffer is full, schedule a merge and add a new empty buffer */
+ /*
+ * If the buffer is full and there is no current merge,
+ * schedule a merge and add a new empty buffer. If there
+ * is a current merge, then just add a new empty buffer
+ * to the current epoch.
+ */
if (buffer->is_full()) {
- // FIXME: possible race here--two identical merges could be scheduled
- auto vers = epoch->get_structure();
-
if constexpr (std::same_as<SCHED, SerialScheduler>) {
+ /* single threaded: run merge and then empty buffer */
+ epoch->end_job();
+ schedule_merge();
buffer->truncate();
- } else {
+ continue;
+ } else if (epoch->prepare_merge()) {
+ /*
+ * add an empty buffer to allow insert proceed and
+ * schedule a merge on a background thread
+ */
buffer = add_empty_buffer(epoch);
+ schedule_merge();
+ } else {
+ /* background merge is ongoing, so just add empty buffer */
+ buffer = add_empty_buffer(epoch, buffer);
}
-
- schedule_merge();
}
res = buffer->append(rec, ts);
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index 58fe6cd..0ebbde9 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -25,6 +25,7 @@ public:
Epoch(size_t number=0)
: m_buffers()
, m_structure(nullptr)
+ , m_active_merge(false)
, m_active_jobs(0)
, m_active(true)
, m_epoch_number(number)
@@ -34,6 +35,7 @@ public:
: m_buffers()
, m_structure(structure)
, m_active_jobs(0)
+ , m_active_merge(false)
, m_active(true)
, m_epoch_number(number)
{
@@ -151,6 +153,31 @@ public:
return epoch;
}
+ /*
+ * Check if a merge can be started from this Epoch.
+ * At present, without concurrent merging, this simply
+ * checks if there is currently a scheduled merge based
+ * on this Epoch. If there is, returns false. If there
+ * isn't, return true and set a flag indicating that
+ * there is an active merge.
+ */
+ bool prepare_merge() {
+ auto old = m_active_merge.load();
+ if (old) {
+ return false;
+ }
+
+ // FIXME: this needs cleaned up
+ while (!m_active_merge.compare_exchange_strong(old, true)) {
+ old = m_active_merge.load();
+ if (old) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
void set_inactive() {
m_active = false;
}
@@ -184,6 +211,8 @@ private:
std::condition_variable m_active_cv;
std::mutex m_cv_lock;
+ std::atomic<bool> m_active_merge;
+
/*
* The number of currently active jobs
* (queries/merges) operating on this