diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-04-22 10:24:41 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-04-22 10:24:41 -0400 |
| commit | 735d397513bc0160ba9ecb17c32c4441ed125f52 (patch) | |
| tree | 455354cb29f6bb2d3c3e4b2521db19970b98cc79 /include/shard/PGM.h | |
| parent | 438feac7e56fee425d9c6f1a43298ff9dc5b71d1 (diff) | |
| download | dynamic-extension-735d397513bc0160ba9ecb17c32c4441ed125f52.tar.gz | |
TS+PGM: Inlined manually the sorted array merge for performance reasons
Diffstat (limited to 'include/shard/PGM.h')
| -rw-r--r-- | include/shard/PGM.h | 119 |
1 files changed, 105 insertions, 14 deletions
diff --git a/include/shard/PGM.h b/include/shard/PGM.h index 691385e..509796b 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -49,16 +49,62 @@ public: sizeof(Wrapped<R>), (byte**) &m_data); - auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + std::vector<K> keys; + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + buffer.get_record_count(), + sizeof(Wrapped<R>)); + buffer.copy_to_buffer((byte *) temp_buffer); - if (m_reccnt > 0) { - std::vector<K> keys; - for (size_t i=0; i<m_reccnt; i++) { - keys.emplace_back(m_data[i].rec.key); + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + merge_info info = {0, 0}; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) + && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; } + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + keys.emplace_back(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf){ + m_bf->insert(base->rec); + } + } + + base++; + } + + free(temp_buffer); + + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_pgm = pgm::PGMIndex<K, epsilon>(keys); } } @@ -77,17 +123,62 @@ public: m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **) &m_data); + std::vector<K> keys; - auto res = sorted_array_merge<R>(cursors, m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i=0; i<cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } - if (m_reccnt > 0) { - std::vector<K> keys; - for (size_t i=0; i<m_reccnt; i++) { - keys.emplace_back(m_data[i].rec.key); + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + keys.emplace_back(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); + } + } + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); } + } + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_pgm = pgm::PGMIndex<K, epsilon>(keys); } } |