summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling/Version.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/scheduling/Version.h')
-rw-r--r--include/framework/scheduling/Version.h44
1 files changed, 34 insertions, 10 deletions
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h
index 4cd73ba..be54c84 100644
--- a/include/framework/scheduling/Version.h
+++ b/include/framework/scheduling/Version.h
@@ -25,16 +25,24 @@ private:
typedef BufferView<RecordType> BufferViewType;
public:
+
Version(size_t vid = 0)
- : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0),
- m_pending_buffer_head(-1) {}
+ : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0) {}
Version(size_t number, std::unique_ptr<StructureType> structure, BufferType *buff,
size_t head)
: m_buffer(buff), m_structure(std::move(structure)), m_id(number),
- m_buffer_head(head), m_pending_buffer_head(-1) {}
-
- ~Version() = default;
+ m_buffer_head(head) {
+ if (m_buffer) {
+ m_buffer->take_head_reference(m_buffer_head);
+ }
+ }
+
+ ~Version() {
+ if (m_buffer) {
+ m_buffer->release_head_reference(m_buffer_head);
+ }
+ }
/*
* Versions are *not* copyable or movable. Only one can exist, and all users
@@ -72,6 +80,9 @@ public:
auto version = new Version(number);
version->m_buffer = m_buffer;
version->m_buffer_head = m_buffer_head;
+ if (version->m_buffer) {
+ version->m_buffer->take_head_reference(m_buffer_head);
+ }
if (m_structure) {
version->m_structure = std::unique_ptr(m_structure->copy());
@@ -81,8 +92,15 @@ public:
}
bool advance_buffer_head(size_t new_head) {
- m_buffer_head = new_head;
- return m_buffer->advance_head(new_head);
+ m_buffer->release_head_reference(m_buffer_head);
+ if (m_buffer->advance_head(new_head)) {
+ m_buffer_head = new_head;
+ return true;
+ }
+
+ /* if we failed to advance, reclaim our reference */
+ m_buffer->take_head_reference(m_buffer_head);
+ return false;
}
void update_shard_version(size_t version) {
@@ -94,9 +112,16 @@ public:
}
- void set_head(size_t head) {
- // fprintf(stderr, "[I] Updating buffer head of %ld to %ld\n", get_id(), head);
+ void set_buffer(BufferType *buffer, size_t head) {
+ assert(m_buffer == nullptr);
+
+ m_buffer = buffer;
m_buffer_head = head;
+ m_buffer->take_head_reference(head);
+ }
+
+ bool valid() {
+ return (m_buffer) && (m_buffer_head) && (m_structure) && (m_id);
}
private:
@@ -105,6 +130,5 @@ private:
size_t m_id;
size_t m_buffer_head;
- ssize_t m_pending_buffer_head;
};
} // namespace de