diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-12 14:10:11 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-12 14:10:11 -0500 |
| commit | aac0bb661af8fae38d3ce08d6078cb4d9dfcb575 (patch) | |
| tree | 347e0ce7f7e15f2610039f02b75d47cedf810cd6 /include/framework/scheduling | |
| parent | c4514c2e62a711189cf3c914297885d97fb51a09 (diff) | |
| download | dynamic-extension-aac0bb661af8fae38d3ce08d6078cb4d9dfcb575.tar.gz | |
Initial integration of new buffering scheme into framework
It isn't working right now (lotsa test failures), but we're to the
debugging phase now.
Diffstat (limited to 'include/framework/scheduling')
| -rw-r--r-- | include/framework/scheduling/Epoch.h | 80 |
1 files changed, 16 insertions, 64 deletions
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 4e1b8a2..ca85fe2 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -8,6 +8,9 @@ */ #pragma once +#include <condition_variable> +#include <mutex> + #include "framework/structure/MutableBuffer.h" #include "framework/structure/ExtensionStructure.h" #include "framework/structure/BufferView.h" @@ -20,10 +23,10 @@ class Epoch { private: typedef MutableBuffer<R> Buffer; typedef ExtensionStructure<R, S, Q, L> Structure; - typedef BufferView<R, Q> BufView; + typedef BufferView<R> BufView; public: Epoch(size_t number=0) - : m_buffers() + : m_buffer(nullptr) , m_structure(nullptr) , m_active_merge(false) , m_active_jobs(0) @@ -31,8 +34,8 @@ public: , m_epoch_number(number) {} - Epoch(size_t number, Structure *structure, Buffer *buff) - : m_buffers() + Epoch(size_t number, Structure *structure, Buffer *buff) + : m_buffer(buff) , m_structure(structure) , m_active_jobs(0) , m_active_merge(false) @@ -40,8 +43,6 @@ public: , m_epoch_number(number) { structure->take_reference(); - buff->take_reference(); - m_buffers.push_back(buff); } ~Epoch() { @@ -54,35 +55,11 @@ public: */ //m_active_cv.notify_all(); - clear_buffers(); - if (m_structure) { m_structure->release_reference(); } } - Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) { - assert(buf); - - std::unique_lock<std::mutex> m_buffer_lock; - /* - * if a current buffer is specified, only add the - * new buffer if the active buffer is the current, - * otherwise just return the active buffer (poor man's - * CAS). - */ - if (cur_buf) { - auto active_buf = get_active_buffer(); - if (active_buf != cur_buf) { - return active_buf; - } - } - - buf->take_reference(); - m_buffers.push_back(buf); - return buf; - } - void start_job() { m_active_jobs.fetch_add(1); } @@ -109,36 +86,10 @@ public: return m_structure; } - std::vector<Buffer *> &get_buffers() { - return m_buffers; - } - - BufView get_buffer_view() { - std::unique_lock<std::mutex> m_buffer_lock; - return BufView(m_buffers); - } - - Buffer *get_active_buffer() { - if (m_buffers.size() == 0) return nullptr; - - return m_buffers[m_buffers.size() - 1]; + BufView get_buffer() { + return m_buffer->get_buffer_view(); } - /* - * Return the number of buffers in this epoch at - * time of call, and then clear the buffer vector, - * releasing all references in the process. - */ - size_t clear_buffers() { - std::unique_lock<std::mutex> m_buffer_lock; - size_t buf_cnt = m_buffers.size(); - for (auto buf : m_buffers) { - if (buf) buf->release_reference(); - } - - m_buffers.clear(); - return buf_cnt; - } /* * Returns a new Epoch object that is a copy of this one. The new object will also contain @@ -148,17 +99,14 @@ public: Epoch *clone(size_t number) { std::unique_lock<std::mutex> m_buffer_lock; auto epoch = new Epoch(number); - epoch->m_buffers = m_buffers; + epoch->m_buffer = m_buffer; + if (m_structure) { epoch->m_structure = m_structure->copy(); /* the copy routine returns a structure with 0 references */ epoch->m_structure->take_reference(); } - for (auto b : m_buffers) { - b->take_reference(); - } - return epoch; } @@ -213,9 +161,13 @@ public: return true; } + bool advance_buffer_head(size_t head) { + return m_buffer->advance_head(head); + } + private: Structure *m_structure; - std::vector<Buffer *> m_buffers; + Buffer *m_buffer; std::condition_variable m_active_cv; std::mutex m_cv_lock; |