summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h35
-rw-r--r--include/framework/scheduling/Epoch.h31
-rw-r--r--include/framework/structure/ExtensionStructure.h5
3 files changed, 54 insertions, 17 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 3a4a7e1..5c1eaab 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -54,7 +54,7 @@ public:
{
auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop);
- auto epoch = new _Epoch(vers, buf);
+ auto epoch = new _Epoch(0, vers, buf);
m_buffers.insert(buf);
m_versions.insert(vers);
@@ -249,13 +249,19 @@ private:
/*
* Update the new Epoch to contain the buffers from the old one
- * that it doesn't currently have
+ * that it doesn't currently have if using a multi-threaded
+ * scheduler (otherwise, there is only one buffer that is
+ * reused, so it shouldn't be removed)
*/
- size_t old_buffer_cnt = new_epoch->clear_buffers();
- for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) {
- new_epoch->add_buffer(old_epoch->get_buffers()[i]);
+ if constexpr (!std::same_as<SCHED, SerialScheduler>) {
+ size_t old_buffer_cnt = new_epoch->clear_buffers();
+ for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) {
+ new_epoch->add_buffer(old_epoch->get_buffers()[i]);
+ }
}
+
m_current_epoch.fetch_add(1);
+ old_epoch->set_inactive();
/* notify any blocking threads that the new epoch is available */
m_epoch_cv_lk.lock();
@@ -276,10 +282,10 @@ private:
* is violated, it is possible that this code will clone a retired
* epoch.
*/
- auto new_epoch = get_active_epoch()->clone();
+ m_newest_epoch.fetch_add(1);
+ auto new_epoch = get_active_epoch()->clone(m_newest_epoch.load());
std::unique_lock<std::mutex> m_struct_lock;
m_versions.insert(new_epoch->get_structure());
- m_newest_epoch.fetch_add(1);
m_epochs.insert({m_newest_epoch.load(), new_epoch});
m_struct_lock.release();
@@ -316,6 +322,9 @@ private:
while (!epoch->retirable())
;
+ /* remove epoch from the framework's map */
+ m_epochs.erase(epoch->get_epoch_number());
+
/*
* The epoch's destructor will handle releasing
* all the references it holds
@@ -440,7 +449,8 @@ private:
int internal_append(const R &rec, bool ts) {
Buffer *buffer = nullptr;
do {
- auto epoch = get_active_epoch_protected();
+ // FIXME: figure out best way to protect this epoch access
+ auto epoch = get_active_epoch();
buffer = epoch->get_active_buffer();
/* if the buffer is full, schedule a merge and add a new empty buffer */
@@ -448,10 +458,13 @@ private:
// FIXME: possible race here--two identical merges could be scheduled
auto vers = epoch->get_structure();
schedule_merge();
- buffer = add_empty_buffer(epoch);
+
+ if constexpr (std::same_as<SCHED, SerialScheduler>) {
+ buffer->truncate();
+ } else {
+ buffer = add_empty_buffer(epoch);
+ }
}
- // FIXME: not exactly the best spot for this
- epoch->end_job();
} while(!buffer->append(rec, ts));
/* internal append should always succeed, eventually */
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index 6bbf927..f4aefe9 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -22,16 +22,20 @@ private:
typedef ExtensionStructure<R, S, Q, L> Structure;
typedef BufferView<R, Q> BufView;
public:
- Epoch()
+ Epoch(size_t number=0)
: m_buffers()
, m_structure(nullptr)
, m_active_jobs(0)
+ , m_active(true)
+ , m_epoch_number(number)
{}
- Epoch(Structure *structure, Buffer *buff)
+ Epoch(size_t number, Structure *structure, Buffer *buff)
: m_buffers()
, m_structure(structure)
, m_active_jobs(0)
+ , m_active(true)
+ , m_epoch_number(number)
{
structure->take_reference();
buff->take_reference();
@@ -62,6 +66,7 @@ public:
}
void end_job() {
+ assert(m_active_jobs.load() > 0);
m_active_jobs.fetch_add(-1);
if (m_active_jobs.load() == 0) {
@@ -74,6 +79,10 @@ public:
return m_active_jobs.load();
}
+ size_t get_epoch_number() {
+ return m_epoch_number;
+ }
+
Structure *get_structure() {
return m_structure;
}
@@ -109,18 +118,29 @@ public:
/*
* Returns a new Epoch object that is a copy of this one. The new object will also contain
- * a copy of the m_structure, rather than a reference to the same one.
+ * a copy of the m_structure, rather than a reference to the same one. The epoch number of
+ * the new epoch will be set to the provided argument.
*/
- Epoch *clone() {
- auto epoch = new Epoch();
+ Epoch *clone(size_t number) {
+ auto epoch = new Epoch(number);
epoch->m_buffers = m_buffers;
if (m_structure) {
epoch->m_structure = m_structure->copy();
+ /* the copy routine returns a structure with 0 references */
+ epoch->m_structure->take_reference();
+ }
+
+ for (auto b : m_buffers) {
+ b->take_reference();
}
return epoch;
}
+ void set_inactive() {
+ m_active = false;
+ }
+
/*
*
*/
@@ -158,5 +178,6 @@ private:
*/
std::atomic<size_t> m_active_jobs;
bool m_active;
+ size_t m_epoch_number;
};
}
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index f5657af..80ec7b9 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -46,7 +46,8 @@ public:
* Create a shallow copy of this extension structure. The copy will share references to the
* same levels/shards as the original, but will have its own lists. As all of the shards are
* immutable (with the exception of deletes), the copy can be restructured with merges, etc.,
- * without affecting the original.
+ * without affecting the original. The copied structure will be returned with a reference
+ * count of 0; generally you will want to immediately call take_reference() on it.
*
* NOTE: When using tagged deletes, a delete of a record in the original structure will affect
* the copy, so long as the copy retains a reference to the same shard as the original. This could
@@ -59,6 +60,8 @@ public:
new_struct->m_levels.push_back(m_levels[i]->clone());
}
+ new_struct->m_refcnt = 0;
+
return new_struct;
}