summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-12 14:10:11 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-12 14:10:11 -0500
commitaac0bb661af8fae38d3ce08d6078cb4d9dfcb575 (patch)
tree347e0ce7f7e15f2610039f02b75d47cedf810cd6 /include/framework/scheduling
parentc4514c2e62a711189cf3c914297885d97fb51a09 (diff)
downloaddynamic-extension-aac0bb661af8fae38d3ce08d6078cb4d9dfcb575.tar.gz
Initial integration of new buffering scheme into framework
It isn't working right now (lotsa test failures), but we're to the debugging phase now.
Diffstat (limited to 'include/framework/scheduling')
-rw-r--r--include/framework/scheduling/Epoch.h80
1 files changed, 16 insertions, 64 deletions
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index 4e1b8a2..ca85fe2 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -8,6 +8,9 @@
*/
#pragma once
+#include <condition_variable>
+#include <mutex>
+
#include "framework/structure/MutableBuffer.h"
#include "framework/structure/ExtensionStructure.h"
#include "framework/structure/BufferView.h"
@@ -20,10 +23,10 @@ class Epoch {
private:
typedef MutableBuffer<R> Buffer;
typedef ExtensionStructure<R, S, Q, L> Structure;
- typedef BufferView<R, Q> BufView;
+ typedef BufferView<R> BufView;
public:
Epoch(size_t number=0)
- : m_buffers()
+ : m_buffer(nullptr)
, m_structure(nullptr)
, m_active_merge(false)
, m_active_jobs(0)
@@ -31,8 +34,8 @@ public:
, m_epoch_number(number)
{}
- Epoch(size_t number, Structure *structure, Buffer *buff)
- : m_buffers()
+ Epoch(size_t number, Structure *structure, Buffer *buff)
+ : m_buffer(buff)
, m_structure(structure)
, m_active_jobs(0)
, m_active_merge(false)
@@ -40,8 +43,6 @@ public:
, m_epoch_number(number)
{
structure->take_reference();
- buff->take_reference();
- m_buffers.push_back(buff);
}
~Epoch() {
@@ -54,35 +55,11 @@ public:
*/
//m_active_cv.notify_all();
- clear_buffers();
-
if (m_structure) {
m_structure->release_reference();
}
}
- Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) {
- assert(buf);
-
- std::unique_lock<std::mutex> m_buffer_lock;
- /*
- * if a current buffer is specified, only add the
- * new buffer if the active buffer is the current,
- * otherwise just return the active buffer (poor man's
- * CAS).
- */
- if (cur_buf) {
- auto active_buf = get_active_buffer();
- if (active_buf != cur_buf) {
- return active_buf;
- }
- }
-
- buf->take_reference();
- m_buffers.push_back(buf);
- return buf;
- }
-
void start_job() {
m_active_jobs.fetch_add(1);
}
@@ -109,36 +86,10 @@ public:
return m_structure;
}
- std::vector<Buffer *> &get_buffers() {
- return m_buffers;
- }
-
- BufView get_buffer_view() {
- std::unique_lock<std::mutex> m_buffer_lock;
- return BufView(m_buffers);
- }
-
- Buffer *get_active_buffer() {
- if (m_buffers.size() == 0) return nullptr;
-
- return m_buffers[m_buffers.size() - 1];
+ BufView get_buffer() {
+ return m_buffer->get_buffer_view();
}
- /*
- * Return the number of buffers in this epoch at
- * time of call, and then clear the buffer vector,
- * releasing all references in the process.
- */
- size_t clear_buffers() {
- std::unique_lock<std::mutex> m_buffer_lock;
- size_t buf_cnt = m_buffers.size();
- for (auto buf : m_buffers) {
- if (buf) buf->release_reference();
- }
-
- m_buffers.clear();
- return buf_cnt;
- }
/*
* Returns a new Epoch object that is a copy of this one. The new object will also contain
@@ -148,17 +99,14 @@ public:
Epoch *clone(size_t number) {
std::unique_lock<std::mutex> m_buffer_lock;
auto epoch = new Epoch(number);
- epoch->m_buffers = m_buffers;
+ epoch->m_buffer = m_buffer;
+
if (m_structure) {
epoch->m_structure = m_structure->copy();
/* the copy routine returns a structure with 0 references */
epoch->m_structure->take_reference();
}
- for (auto b : m_buffers) {
- b->take_reference();
- }
-
return epoch;
}
@@ -213,9 +161,13 @@ public:
return true;
}
+ bool advance_buffer_head(size_t head) {
+ return m_buffer->advance_head(head);
+ }
+
private:
Structure *m_structure;
- std::vector<Buffer *> m_buffers;
+ Buffer *m_buffer;
std::condition_variable m_active_cv;
std::mutex m_cv_lock;