diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-17 18:22:00 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-17 18:22:00 -0500 |
| commit | 138c793b0a58577713d98c98bb140cf1d9c79bee (patch) | |
| tree | 921197e2ba521704cb379ac8069189e70f8dee3d /include/shard | |
| parent | 2117935e85412f3733ee0bcb1830c7fd0b129b29 (diff) | |
| download | dynamic-extension-138c793b0a58577713d98c98bb140cf1d9c79bee.tar.gz | |
Multiple concurrency bug fixes
A poorly organized commit with fixes for a variety of bugs that were
causing missing records. The core problems all appear to be fixed,
though there is an outstanding problem with tombstones not being
completely canceled. A very small number are appearing in the wrong
order during the static structure test.
Diffstat (limited to 'include/shard')
| -rw-r--r-- | include/shard/ISAMTree.h | 35 |
1 files changed, 20 insertions, 15 deletions
diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index 6b2f6b5..932e767 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -62,10 +62,13 @@ public: { TIMER_INIT(); - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), (byte**) &m_data); + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + buffer.get_record_count() * + sizeof(Wrapped<R>), + (byte**) &m_data); TIMER_START(); - auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>)); + auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>)); buffer.copy_to_buffer((byte *) temp_buffer); auto base = temp_buffer; @@ -99,6 +102,7 @@ public: base++; } + TIMER_STOP(); auto copy_time = TIMER_RESULT(); @@ -112,7 +116,7 @@ public: free(temp_buffer); } - ISAMTree(ISAMTree** runs, size_t len) + ISAMTree(std::vector<ISAMTree*> &shards) : m_bf(nullptr) , m_isam_nodes(nullptr) , m_root(nullptr) @@ -124,19 +128,19 @@ public: , m_data(nullptr) { std::vector<Cursor<Wrapped<R>>> cursors; - cursors.reserve(len); + cursors.reserve(shards.size()); - PriorityQueue<Wrapped<R>> pq(len); + PriorityQueue<Wrapped<R>> pq(shards.size()); size_t attemp_reccnt = 0; size_t tombstone_count = 0; - for (size_t i = 0; i < len; ++i) { - if (runs[i]) { - auto base = runs[i]->get_data(); - cursors.emplace_back(Cursor{base, base + runs[i]->get_record_count(), 0, runs[i]->get_record_count()}); - attemp_reccnt += runs[i]->get_record_count(); - tombstone_count += runs[i]->get_tombstone_count(); + for (size_t i = 0; i < shards.size(); ++i) { + if (shards[i]) { + auto base = shards[i]->get_data(); + cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); + attemp_reccnt += shards[i]->get_record_count(); + tombstone_count += shards[i]->get_tombstone_count(); pq.push(cursors[i].ptr, i); } else { cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); @@ -144,10 +148,9 @@ public: } m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); - - m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE); - assert(m_alloc_size % CACHELINE_SIZE == 0); - m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size); + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + attemp_reccnt * sizeof(Wrapped<R>), + (byte **) &m_data); while (pq.size()) { auto now = pq.peek(); @@ -165,6 +168,8 @@ public: if (!cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; if (cursor.ptr->is_tombstone()) { + //fprintf(stderr, "ISAM: Tombstone from shard %ld next record from shard %ld\n", + //now.version, next.version); ++m_tombstone_cnt; m_bf->insert(cursor.ptr->rec); } |