summaryrefslogtreecommitdiffstats
path: root/include/framework/structure
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-17 18:22:00 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-17 18:22:00 -0500
commit138c793b0a58577713d98c98bb140cf1d9c79bee (patch)
tree921197e2ba521704cb379ac8069189e70f8dee3d /include/framework/structure
parent2117935e85412f3733ee0bcb1830c7fd0b129b29 (diff)
downloaddynamic-extension-138c793b0a58577713d98c98bb140cf1d9c79bee.tar.gz
Multiple concurrency bug fixes
A poorly organized commit with fixes for a variety of bugs that were causing missing records. The core problems all appear to be fixed, though there is an outstanding problem with tombstones not being completely canceled. A very small number are appearing in the wrong order during the static structure test.
Diffstat (limited to 'include/framework/structure')
-rw-r--r--include/framework/structure/BufferView.h2
-rw-r--r--include/framework/structure/InternalLevel.h26
-rw-r--r--include/framework/structure/MutableBuffer.h58
3 files changed, 50 insertions, 36 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index 099b7a2..30fffed 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -110,7 +110,7 @@ public:
size_t split_idx = m_cap - (m_head % m_cap);
memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), split_idx* sizeof(Wrapped<R>));
- memcpy(buffer + split_idx, (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped<R>));
+ memcpy(buffer + (split_idx * sizeof(Wrapped<R>)), (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped<R>));
} else {
memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped<R>));
}
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index b35cadd..e9874e0 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -51,11 +51,10 @@ public:
assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
auto res = new InternalLevel(base_level->m_level_no, 1);
res->m_shard_cnt = 1;
- Shard* shards[2];
- shards[0] = base_level->m_shards[0].get();
- shards[1] = new_level->m_shards[0].get();
+ std::vector<Shard *> shards = {base_level->m_shards[0].get(),
+ new_level->m_shards[0].get()};
- res->m_shards[0] = std::make_shared<S>(shards, 2);
+ res->m_shards[0] = std::make_shared<S>(shards);
return std::shared_ptr<InternalLevel>(res);
}
@@ -75,17 +74,17 @@ public:
return;
}
- Shard *shards[level->m_shard_cnt];
- for (size_t i=0; i<level->m_shard_cnt; i++) {
- shards[i] = level->m_shards[i].get();
+ std::vector<S*> shards;
+ for (auto shard : level->m_shards) {
+ if (shard) shards.emplace_back(shard.get());
}
if (m_shard_cnt == m_shards.size()) {
- m_pending_shard = new S(shards, level->m_shard_cnt);
+ m_pending_shard = new S(shards);
return;
}
- auto tmp = new S(shards, level->m_shard_cnt);
+ auto tmp = new S(shards);
m_shards[m_shard_cnt] = std::shared_ptr<S>(tmp);
++m_shard_cnt;
@@ -131,13 +130,12 @@ public:
return nullptr;
}
- Shard *shards[m_shard_cnt];
-
- for (size_t i=0; i<m_shard_cnt; i++) {
- shards[i] = m_shards[i].get();
+ std::vector<Shard *> shards;
+ for (auto shard : m_shards) {
+ if (shard) shards.emplace_back(shard.get());
}
- return new S(shards, m_shard_cnt);
+ return new S(shards);
}
void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index eeb3dc9..7edde2f 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -133,18 +133,18 @@ public:
return m_tombstone_filter->get_memory_usage();
}
- BufferView<R> get_buffer_view() {
- size_t head = get_head();
+ BufferView<R> get_buffer_view(size_t target_head) {
+ size_t head = get_head(target_head);
auto f = std::bind(release_head_reference, (void *) this, head);
return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
}
- BufferView<R> get_flush_buffer_view() {
- size_t head = get_head();
+ BufferView<R> get_buffer_view() {
+ size_t head = get_head(m_head.load().head_idx);
auto f = std::bind(release_head_reference, (void *) this, head);
- return BufferView<R>(m_data, m_cap, head, head + m_lwm, m_tscnt.load(), m_tombstone_filter, f);
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
}
/*
@@ -167,23 +167,39 @@ public:
buffer_head new_hd = {new_head, 0};
buffer_head cur_hd;
- /* move the current head into the old head */
+ /* replace current head with new head */
do {
- buffer_head cur_hd = m_head.load();
- m_old_head.store(cur_hd);
+ cur_hd = m_head.load();
} while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+ /* move the current head into the old head */
+ m_old_head.store(cur_hd);
+
m_active_head_advance.store(false);
return true;
}
- size_t get_head() {
+ /*
+ * FIXME: If target_head does not match *either* the old_head or the
+ * current_head, this routine will loop infinitely.
+ */
+ size_t get_head(size_t target_head) {
buffer_head cur_hd, new_hd;
+ bool head_acquired = false;
do {
- cur_hd = m_head.load();
- new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
- } while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+ if (m_old_head.load().head_idx == target_head) {
+ cur_hd = m_old_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else if (m_head.load().head_idx == target_head){
+ cur_hd = m_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ } while(!head_acquired);
return new_hd.head_idx;
}
@@ -254,22 +270,22 @@ private:
buffer_head cur_hd, new_hd;
do {
- if (buffer->m_head.load().head_idx == head) {
- cur_hd = buffer->m_head;
- assert(cur_hd.refcnt > 0);
+ if (buffer->m_old_head.load().head_idx == head) {
+ cur_hd = buffer->m_old_head;
+ if (cur_hd.refcnt == 0) continue;
new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
-
- if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
+ if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
break;
}
} else {
- cur_hd = buffer->m_old_head;
- assert(cur_hd.refcnt > 0);
+ cur_hd = buffer->m_head;
+ if (cur_hd.refcnt == 0) continue;
new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
- if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
+
+ if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
break;
}
- }
+ }
_mm_pause();
} while(true);
}