diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-01-27 18:17:21 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-01-27 18:17:21 -0500 |
| commit | 30da48151f58803968ca3ef5d42e66a9223d80a4 (patch) | |
| tree | 23cad3718bca116016caf5aba375a3eb3a490328 /include/framework/scheduling/Version.h | |
| parent | f149a2459cfc2007f755d792b3c4e567d30c132f (diff) | |
| download | dynamic-extension-30da48151f58803968ca3ef5d42e66a9223d80a4.tar.gz | |
progress
Diffstat (limited to 'include/framework/scheduling/Version.h')
| -rw-r--r-- | include/framework/scheduling/Version.h | 102 |
1 files changed, 41 insertions, 61 deletions
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index 2b2b5ba..8d3d038 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -20,30 +20,21 @@ template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> class Version { private: typedef typename ShardType::RECORD RecordType; - typedef MutableBuffer<RecordType> Buffer; - typedef ExtensionStructure<ShardType, QueryType> Structure; - typedef BufferView<RecordType> BufView; + typedef MutableBuffer<RecordType> BufferType; + typedef ExtensionStructure<ShardType, QueryType> StructureType; + typedef BufferView<RecordType> BufferViewType; public: - Version(size_t number = 0) - : m_buffer(nullptr), m_structure(nullptr), m_active_merge(false), - m_epoch_number(number), m_buffer_head(0) {} - - Version(size_t number, Structure *structure, Buffer *buff, size_t head) - : m_buffer(buff), m_structure(structure), m_active_merge(false), - m_epoch_number(number), m_buffer_head(head) { - structure->take_reference(); - } + Version(size_t vid = 0) + : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0), + m_pending_buffer_head(-1) {} - ~Version() { - if (m_structure) { - m_structure->release_reference(); - } + Version(size_t number, std::unique_ptr<StructureType> structure, BufferType *buff, + size_t head) + : m_buffer(buff), m_structure(std::move(structure)), m_id(number), + m_buffer_head(head), m_pending_buffer_head(-1) {} - if (m_structure->get_reference_count() == 0) { - delete m_structure; - } - } + ~Version() = default; /* * Versions are *not* copyable or movable. Only one can exist, and all users @@ -54,13 +45,26 @@ public: Version &operator=(const Version &) = delete; Version &operator=(Version &&) = delete; - size_t get_epoch_number() const { return m_epoch_number; } + size_t get_id() const { return m_id; } + + void set_id(size_t id) { m_id = id;} - const Structure *get_structure() const { return m_structure.get(); } + const StructureType *get_structure() const { return m_structure.get(); } - Structure *get_mutable_structure() { return m_structure.get(); } + StructureType *get_mutable_structure() { return m_structure.get(); } - BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); } + bool set_structure(std::unique_ptr<StructureType> new_struct) { + if (m_structure) { + return false; + } + + m_structure = std::move(new_struct); + return true; + } + + BufferViewType get_buffer() const { + return m_buffer->get_buffer_view(m_buffer_head); + } /* * Returns a new Version object that is a copy of this one. The new object @@ -69,54 +73,29 @@ public: * provided argument. */ Version *clone(size_t number) { - std::unique_lock<std::mutex> m_buffer_lock; - auto epoch = new Version(number); - epoch->m_buffer = m_buffer; - epoch->m_buffer_head = m_buffer_head; + auto version = new Version(number); + version->m_buffer = m_buffer; + version->m_buffer_head = m_buffer_head; if (m_structure) { - epoch->m_structure = m_structure->copy(); - /* the copy routine returns a structure with 0 references */ - epoch->m_structure->take_reference(); + version->m_structure = std::unique_ptr(m_structure->copy()); } - return epoch; + return version; } - /* - * Check if a merge can be started from this Version. At present, without - * concurrent merging, this simply checks if there is currently a scheduled - * merge based on this Version. If there is, returns false. If there isn't, - * return true and set a flag indicating that there is an active merge. - */ - bool prepare_reconstruction() { - auto old = m_active_merge.load(); - if (old) { - return false; - } - - // FIXME: this needs cleaned up - while (!m_active_merge.compare_exchange_strong(old, true)) { - old = m_active_merge.load(); - if (old) { - return false; - } - } - - return true; + void set_next_buffer_head(size_t new_head) { + m_pending_buffer_head = new_head; } - bool advance_buffer_head(size_t head) { - m_buffer_head = head; + bool advance_buffer_head() { + m_buffer_head = m_pending_buffer_head; return m_buffer->advance_head(m_buffer_head); } private: - Buffer *m_buffer; - std::unique_ptr<Structure> m_structure; - - std::mutex m_buffer_lock; - std::atomic<bool> m_active_merge; + BufferType *m_buffer; + std::unique_ptr<StructureType> m_structure; /* * The number of currently active jobs @@ -124,7 +103,8 @@ private: * epoch. An epoch can only be retired * when this number is 0. */ - size_t m_epoch_number; + size_t m_id; size_t m_buffer_head; + ssize_t m_pending_buffer_head; }; } // namespace de |