From 138c793b0a58577713d98c98bb140cf1d9c79bee Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 17 Jan 2024 18:22:00 -0500 Subject: Multiple concurrency bug fixes A poorly organized commit with fixes for a variety of bugs that were causing missing records. The core problems all appear to be fixed, though there is an outstanding problem with tombstones not being completely canceled. A very small number are appearing in the wrong order during the static structure test. --- include/shard/ISAMTree.h | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) (limited to 'include/shard/ISAMTree.h') diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index 6b2f6b5..932e767 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -62,10 +62,13 @@ public: { TIMER_INIT(); - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped), (byte**) &m_data); + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + buffer.get_record_count() * + sizeof(Wrapped), + (byte**) &m_data); TIMER_START(); - auto temp_buffer = (Wrapped *) psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped)); + auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped)); buffer.copy_to_buffer((byte *) temp_buffer); auto base = temp_buffer; @@ -99,6 +102,7 @@ public: base++; } + TIMER_STOP(); auto copy_time = TIMER_RESULT(); @@ -112,7 +116,7 @@ public: free(temp_buffer); } - ISAMTree(ISAMTree** runs, size_t len) + ISAMTree(std::vector &shards) : m_bf(nullptr) , m_isam_nodes(nullptr) , m_root(nullptr) @@ -124,19 +128,19 @@ public: , m_data(nullptr) { std::vector>> cursors; - cursors.reserve(len); + cursors.reserve(shards.size()); - PriorityQueue> pq(len); + PriorityQueue> pq(shards.size()); size_t attemp_reccnt = 0; size_t tombstone_count = 0; - for (size_t i = 0; i < len; ++i) { - if (runs[i]) { - auto base = runs[i]->get_data(); - cursors.emplace_back(Cursor{base, base + runs[i]->get_record_count(), 0, runs[i]->get_record_count()}); - attemp_reccnt += runs[i]->get_record_count(); - tombstone_count += runs[i]->get_tombstone_count(); + for (size_t i = 0; i < shards.size(); ++i) { + if (shards[i]) { + auto base = shards[i]->get_data(); + cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); + attemp_reccnt += shards[i]->get_record_count(); + tombstone_count += shards[i]->get_tombstone_count(); pq.push(cursors[i].ptr, i); } else { cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); @@ -144,10 +148,9 @@ public: } m_bf = new BloomFilter(BF_FPR, tombstone_count, BF_HASH_FUNCS); - - m_alloc_size = (attemp_reccnt * sizeof(Wrapped)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped)) % CACHELINE_SIZE); - assert(m_alloc_size % CACHELINE_SIZE == 0); - m_data = (Wrapped*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size); + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + attemp_reccnt * sizeof(Wrapped), + (byte **) &m_data); while (pq.size()) { auto now = pq.peek(); @@ -165,6 +168,8 @@ public: if (!cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; if (cursor.ptr->is_tombstone()) { + //fprintf(stderr, "ISAM: Tombstone from shard %ld next record from shard %ld\n", + //now.version, next.version); ++m_tombstone_cnt; m_bf->insert(cursor.ptr->rec); } -- cgit v1.2.3