diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-17 18:22:00 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-17 18:22:00 -0500 |
| commit | 138c793b0a58577713d98c98bb140cf1d9c79bee (patch) | |
| tree | 921197e2ba521704cb379ac8069189e70f8dee3d /include/framework/structure | |
| parent | 2117935e85412f3733ee0bcb1830c7fd0b129b29 (diff) | |
| download | dynamic-extension-138c793b0a58577713d98c98bb140cf1d9c79bee.tar.gz | |
Multiple concurrency bug fixes
A poorly organized commit with fixes for a variety of bugs that were
causing missing records. The core problems all appear to be fixed,
though there is an outstanding problem with tombstones not being
completely canceled. A very small number are appearing in the wrong
order during the static structure test.
Diffstat (limited to 'include/framework/structure')
| -rw-r--r-- | include/framework/structure/BufferView.h | 2 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 26 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 58 |
3 files changed, 50 insertions, 36 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 099b7a2..30fffed 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -110,7 +110,7 @@ public: size_t split_idx = m_cap - (m_head % m_cap); memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), split_idx* sizeof(Wrapped<R>)); - memcpy(buffer + split_idx, (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped<R>)); + memcpy(buffer + (split_idx * sizeof(Wrapped<R>)), (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped<R>)); } else { memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped<R>)); } diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index b35cadd..e9874e0 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -51,11 +51,10 @@ public: assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); auto res = new InternalLevel(base_level->m_level_no, 1); res->m_shard_cnt = 1; - Shard* shards[2]; - shards[0] = base_level->m_shards[0].get(); - shards[1] = new_level->m_shards[0].get(); + std::vector<Shard *> shards = {base_level->m_shards[0].get(), + new_level->m_shards[0].get()}; - res->m_shards[0] = std::make_shared<S>(shards, 2); + res->m_shards[0] = std::make_shared<S>(shards); return std::shared_ptr<InternalLevel>(res); } @@ -75,17 +74,17 @@ public: return; } - Shard *shards[level->m_shard_cnt]; - for (size_t i=0; i<level->m_shard_cnt; i++) { - shards[i] = level->m_shards[i].get(); + std::vector<S*> shards; + for (auto shard : level->m_shards) { + if (shard) shards.emplace_back(shard.get()); } if (m_shard_cnt == m_shards.size()) { - m_pending_shard = new S(shards, level->m_shard_cnt); + m_pending_shard = new S(shards); return; } - auto tmp = new S(shards, level->m_shard_cnt); + auto tmp = new S(shards); m_shards[m_shard_cnt] = std::shared_ptr<S>(tmp); ++m_shard_cnt; @@ -131,13 +130,12 @@ public: return nullptr; } - Shard *shards[m_shard_cnt]; - - for (size_t i=0; i<m_shard_cnt; i++) { - shards[i] = m_shards[i].get(); + std::vector<Shard *> shards; + for (auto shard : m_shards) { + if (shard) shards.emplace_back(shard.get()); } - return new S(shards, m_shard_cnt); + return new S(shards); } void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) { diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index eeb3dc9..7edde2f 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -133,18 +133,18 @@ public: return m_tombstone_filter->get_memory_usage(); } - BufferView<R> get_buffer_view() { - size_t head = get_head(); + BufferView<R> get_buffer_view(size_t target_head) { + size_t head = get_head(target_head); auto f = std::bind(release_head_reference, (void *) this, head); return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); } - BufferView<R> get_flush_buffer_view() { - size_t head = get_head(); + BufferView<R> get_buffer_view() { + size_t head = get_head(m_head.load().head_idx); auto f = std::bind(release_head_reference, (void *) this, head); - return BufferView<R>(m_data, m_cap, head, head + m_lwm, m_tscnt.load(), m_tombstone_filter, f); + return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); } /* @@ -167,23 +167,39 @@ public: buffer_head new_hd = {new_head, 0}; buffer_head cur_hd; - /* move the current head into the old head */ + /* replace current head with new head */ do { - buffer_head cur_hd = m_head.load(); - m_old_head.store(cur_hd); + cur_hd = m_head.load(); } while(!m_head.compare_exchange_strong(cur_hd, new_hd)); + /* move the current head into the old head */ + m_old_head.store(cur_hd); + m_active_head_advance.store(false); return true; } - size_t get_head() { + /* + * FIXME: If target_head does not match *either* the old_head or the + * current_head, this routine will loop infinitely. + */ + size_t get_head(size_t target_head) { buffer_head cur_hd, new_hd; + bool head_acquired = false; do { - cur_hd = m_head.load(); - new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; - } while(!m_head.compare_exchange_strong(cur_hd, new_hd)); + if (m_old_head.load().head_idx == target_head) { + cur_hd = m_old_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else if (m_head.load().head_idx == target_head){ + cur_hd = m_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } while(!head_acquired); return new_hd.head_idx; } @@ -254,22 +270,22 @@ private: buffer_head cur_hd, new_hd; do { - if (buffer->m_head.load().head_idx == head) { - cur_hd = buffer->m_head; - assert(cur_hd.refcnt > 0); + if (buffer->m_old_head.load().head_idx == head) { + cur_hd = buffer->m_old_head; + if (cur_hd.refcnt == 0) continue; new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; - - if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { + if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { break; } } else { - cur_hd = buffer->m_old_head; - assert(cur_hd.refcnt > 0); + cur_hd = buffer->m_head; + if (cur_hd.refcnt == 0) continue; new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; - if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { + + if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { break; } - } + } _mm_pause(); } while(true); } |