From 735d397513bc0160ba9ecb17c32c4441ed125f52 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 22 Apr 2024 10:24:41 -0400 Subject: TS+PGM: Inlined manually the sorted array merge for performance reasons --- include/shard/PGM.h | 119 +++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 105 insertions(+), 14 deletions(-) (limited to 'include/shard/PGM.h') diff --git a/include/shard/PGM.h b/include/shard/PGM.h index 691385e..509796b 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -49,16 +49,62 @@ public: sizeof(Wrapped), (byte**) &m_data); - auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + std::vector keys; + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + buffer.get_record_count(), + sizeof(Wrapped)); + buffer.copy_to_buffer((byte *) temp_buffer); - if (m_reccnt > 0) { - std::vector keys; - for (size_t i=0; i>()); + + merge_info info = {0, 0}; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) + && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; } + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + keys.emplace_back(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf){ + m_bf->insert(base->rec); + } + } + + base++; + } + + free(temp_buffer); + + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_pgm = pgm::PGMIndex(keys); } } @@ -77,17 +123,62 @@ public: m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **) &m_data); + std::vector keys; - auto res = sorted_array_merge(cursors, m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i=0; i 0) { - std::vector keys; - for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + keys.emplace_back(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); + } + } + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); } + } + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_pgm = pgm::PGMIndex(keys); } } -- cgit v1.2.3