diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-03-03 13:41:19 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-03-03 13:41:19 -0500 |
| commit | 2ded45f5a20f38fdfd9f348c446c38dc713a5591 (patch) | |
| tree | 746fb09b49ee4c00fc3e4760d899d60d8d8dcce0 /include/framework/scheduling/Version.h | |
| parent | d116b94389538aa8e0e7354fae77693b980de4f0 (diff) | |
| download | dynamic-extension-2ded45f5a20f38fdfd9f348c446c38dc713a5591.tar.gz | |
Fixed a few concurrency bugs
Diffstat (limited to 'include/framework/scheduling/Version.h')
| -rw-r--r-- | include/framework/scheduling/Version.h | 44 |
1 files changed, 34 insertions, 10 deletions
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index 4cd73ba..be54c84 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -25,16 +25,24 @@ private: typedef BufferView<RecordType> BufferViewType; public: + Version(size_t vid = 0) - : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0), - m_pending_buffer_head(-1) {} + : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0) {} Version(size_t number, std::unique_ptr<StructureType> structure, BufferType *buff, size_t head) : m_buffer(buff), m_structure(std::move(structure)), m_id(number), - m_buffer_head(head), m_pending_buffer_head(-1) {} - - ~Version() = default; + m_buffer_head(head) { + if (m_buffer) { + m_buffer->take_head_reference(m_buffer_head); + } + } + + ~Version() { + if (m_buffer) { + m_buffer->release_head_reference(m_buffer_head); + } + } /* * Versions are *not* copyable or movable. Only one can exist, and all users @@ -72,6 +80,9 @@ public: auto version = new Version(number); version->m_buffer = m_buffer; version->m_buffer_head = m_buffer_head; + if (version->m_buffer) { + version->m_buffer->take_head_reference(m_buffer_head); + } if (m_structure) { version->m_structure = std::unique_ptr(m_structure->copy()); @@ -81,8 +92,15 @@ public: } bool advance_buffer_head(size_t new_head) { - m_buffer_head = new_head; - return m_buffer->advance_head(new_head); + m_buffer->release_head_reference(m_buffer_head); + if (m_buffer->advance_head(new_head)) { + m_buffer_head = new_head; + return true; + } + + /* if we failed to advance, reclaim our reference */ + m_buffer->take_head_reference(m_buffer_head); + return false; } void update_shard_version(size_t version) { @@ -94,9 +112,16 @@ public: } - void set_head(size_t head) { - // fprintf(stderr, "[I] Updating buffer head of %ld to %ld\n", get_id(), head); + void set_buffer(BufferType *buffer, size_t head) { + assert(m_buffer == nullptr); + + m_buffer = buffer; m_buffer_head = head; + m_buffer->take_head_reference(head); + } + + bool valid() { + return (m_buffer) && (m_buffer_head) && (m_structure) && (m_id); } private: @@ -105,6 +130,5 @@ private: size_t m_id; size_t m_buffer_head; - ssize_t m_pending_buffer_head; }; } // namespace de |