summaryrefslogtreecommitdiffstats
path: root/include/framework/structure/MutableBuffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/structure/MutableBuffer.h')
-rw-r--r--include/framework/structure/MutableBuffer.h58
1 files changed, 37 insertions, 21 deletions
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index eeb3dc9..7edde2f 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -133,18 +133,18 @@ public:
return m_tombstone_filter->get_memory_usage();
}
- BufferView<R> get_buffer_view() {
- size_t head = get_head();
+ BufferView<R> get_buffer_view(size_t target_head) {
+ size_t head = get_head(target_head);
auto f = std::bind(release_head_reference, (void *) this, head);
return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
}
- BufferView<R> get_flush_buffer_view() {
- size_t head = get_head();
+ BufferView<R> get_buffer_view() {
+ size_t head = get_head(m_head.load().head_idx);
auto f = std::bind(release_head_reference, (void *) this, head);
- return BufferView<R>(m_data, m_cap, head, head + m_lwm, m_tscnt.load(), m_tombstone_filter, f);
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
}
/*
@@ -167,23 +167,39 @@ public:
buffer_head new_hd = {new_head, 0};
buffer_head cur_hd;
- /* move the current head into the old head */
+ /* replace current head with new head */
do {
- buffer_head cur_hd = m_head.load();
- m_old_head.store(cur_hd);
+ cur_hd = m_head.load();
} while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+ /* move the current head into the old head */
+ m_old_head.store(cur_hd);
+
m_active_head_advance.store(false);
return true;
}
- size_t get_head() {
+ /*
+ * FIXME: If target_head does not match *either* the old_head or the
+ * current_head, this routine will loop infinitely.
+ */
+ size_t get_head(size_t target_head) {
buffer_head cur_hd, new_hd;
+ bool head_acquired = false;
do {
- cur_hd = m_head.load();
- new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
- } while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+ if (m_old_head.load().head_idx == target_head) {
+ cur_hd = m_old_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else if (m_head.load().head_idx == target_head){
+ cur_hd = m_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ } while(!head_acquired);
return new_hd.head_idx;
}
@@ -254,22 +270,22 @@ private:
buffer_head cur_hd, new_hd;
do {
- if (buffer->m_head.load().head_idx == head) {
- cur_hd = buffer->m_head;
- assert(cur_hd.refcnt > 0);
+ if (buffer->m_old_head.load().head_idx == head) {
+ cur_hd = buffer->m_old_head;
+ if (cur_hd.refcnt == 0) continue;
new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
-
- if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
+ if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
break;
}
} else {
- cur_hd = buffer->m_old_head;
- assert(cur_hd.refcnt > 0);
+ cur_hd = buffer->m_head;
+ if (cur_hd.refcnt == 0) continue;
new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
- if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
+
+ if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
break;
}
- }
+ }
_mm_pause();
} while(true);
}