diff options
Diffstat (limited to 'include/util/SortedMerge.h')
| -rw-r--r-- | include/util/SortedMerge.h | 274 |
1 files changed, 141 insertions, 133 deletions
diff --git a/include/util/SortedMerge.h b/include/util/SortedMerge.h index c149189..b0a3215 100644 --- a/include/util/SortedMerge.h +++ b/include/util/SortedMerge.h @@ -1,72 +1,78 @@ /* * include/util/SortedMerge.h * - * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> * * Distributed under the Modified BSD License. * * A sorted array merge routine for use in Shard construction, as many - * shards will use a sorted array to represent their data. Also encapsulates + * shards will use a sorted array to represent their data. Also encapsulates * the necessary tombstone-cancellation logic. * - * FIXME: include generic per-record processing functionality for Shards that + * FIXME: include generic per-record processing functionality for Shards that * need it, to avoid needing to reprocess the array in the shard after * creation. */ #pragma once -#include "util/Cursor.h" +#include <algorithm> + #include "framework/interface/Shard.h" #include "psu-ds/PriorityQueue.h" +#include "util/Cursor.h" namespace de { -using psudb::PriorityQueue; using psudb::BloomFilter; -using psudb::queue_record; using psudb::byte; using psudb::CACHELINE_SIZE; +using psudb::PriorityQueue; +using psudb::queue_record; /* - * A simple struct to return record_count and tombstone_count information - * back to the caller. Could've been an std::pair, but I like the more + * A simple struct to return record_count and tombstone_count information + * back to the caller. Could've been an std::pair, but I like the more * explicit names. */ struct merge_info { - size_t record_count; - size_t tombstone_count; + size_t record_count; + size_t tombstone_count; }; /* * Build a vector of cursors corresponding to the records contained within * a vector of shards. The cursor at index i in the output will correspond - * to the shard at index i in the input. + * to the shard at index i in the input. * * The values of reccnt and tscnt will be updated with the sum of the * records contained within the shards. Note that these counts include deleted * records that may be removed during shard construction, and so constitute * upper bounds only. */ -template <RecordInterface R, ShardInterface<R> S> -static std::vector<Cursor<Wrapped<R>>> build_cursor_vec(std::vector<S*> &shards, size_t *reccnt, size_t *tscnt) { - std::vector<Cursor<Wrapped<R>>> cursors; - cursors.reserve(shards.size()); - - *reccnt = 0; - *tscnt = 0; - - for (size_t i = 0; i < shards.size(); ++i) { - if (shards[i]) { - auto base = shards[i]->get_data(); - cursors.emplace_back(Cursor<Wrapped<R>>{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); - *reccnt += shards[i]->get_record_count(); - *tscnt += shards[i]->get_tombstone_count(); - } else { - cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); - } +template <RecordInterface R, ShardInterface S> +static std::vector<Cursor<Wrapped<R>>> +build_cursor_vec(std::vector<S *> const &shards, size_t *reccnt, + size_t *tscnt) { + std::vector<Cursor<Wrapped<R>>> cursors; + cursors.reserve(shards.size()); + + *reccnt = 0; + *tscnt = 0; + + for (size_t i = 0; i < shards.size(); ++i) { + if (shards[i]) { + auto base = shards[i]->get_data(); + cursors.emplace_back( + Cursor<Wrapped<R>>{base, base + shards[i]->get_record_count(), 0, + shards[i]->get_record_count()}); + *reccnt += shards[i]->get_record_count(); + *tscnt += shards[i]->get_tombstone_count(); + } else { + cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); } + } - return cursors; + return cursors; } /* @@ -80,126 +86,128 @@ static std::vector<Cursor<Wrapped<R>>> build_cursor_vec(std::vector<S*> &shards, * program will be aborted if the allocation fails. */ template <RecordInterface R> -static merge_info sorted_array_from_bufferview(BufferView<R> bv, - Wrapped<R> *buffer, - psudb::BloomFilter<R> *bf=nullptr) { - /* - * Copy the contents of the buffer view into a temporary buffer, and - * sort them. We still need to iterate over these temporary records to - * apply tombstone/deleted record filtering, as well as any possible - * per-record processing that is required by the shard being built. - */ - auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, - bv.get_record_count(), - sizeof(Wrapped<R>)); - bv.copy_to_buffer((byte *) temp_buffer); - - auto base = temp_buffer; - auto stop = base + bv.get_record_count(); - std::sort(base, stop, std::less<Wrapped<R>>()); - - merge_info info = {0, 0}; - - /* - * Iterate over the temporary buffer to process the records, copying - * them into buffer as needed - */ - while (base < stop) { - if (!base->is_tombstone() && (base + 1 < stop) - && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - continue; - } else if (base->is_deleted()) { - base += 1; - continue; - } - - // FIXME: this shouldn't be necessary, but the tagged record - // bypass doesn't seem to be working on this code-path, so this - // ensures that tagged records from the buffer are able to be - // dropped, eventually. It should only need to be &= 1 - base->header &= 3; - buffer[info.record_count++] = *base; - - if (base->is_tombstone()) { - info.tombstone_count++; - if (bf){ - bf->insert(base->rec); - } - } +static merge_info +sorted_array_from_bufferview(BufferView<R> bv, Wrapped<R> *buffer, + psudb::BloomFilter<R> *bf = nullptr) { + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped<R> *)psudb::sf_aligned_calloc( + CACHELINE_SIZE, bv.get_record_count(), sizeof(Wrapped<R>)); + bv.copy_to_buffer((byte *)temp_buffer); + + auto base = temp_buffer; + auto stop = base + bv.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + merge_info info = {0, 0}; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) && + base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; + } - base++; + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + buffer[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (bf) { + bf->insert(base->rec); + } } - free(temp_buffer); - return info; + base++; + } + + free(temp_buffer); + return info; } /* * Perform a sorted merge of the records within cursors into the provided * buffer. Includes tombstone and tagged delete cancellation logic, and - * will insert tombstones into a bloom filter, if one is provided. + * will insert tombstones into a bloom filter, if one is provided. * * The behavior of this function is undefined if the provided buffer does * not have space to contain all of the records within the input cursors. */ template <RecordInterface R> -static merge_info sorted_array_merge(std::vector<Cursor<Wrapped<R>>> &cursors, - Wrapped<R> *buffer, - psudb::BloomFilter<R> *bf=nullptr) { - - // FIXME: For smaller cursor arrays, it may be more efficient to skip - // the priority queue and just do a scan. - PriorityQueue<Wrapped<R>> pq(cursors.size()); - for (size_t i=0; i<cursors.size(); i++) { - pq.push(cursors[i].ptr, i); - } - - merge_info info = {0, 0}; - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; - /* - * if the current record is not a tombstone, and the next record is - * a tombstone that matches the current one, then the current one - * has been deleted, and both it and its tombstone can be skipped - * over. +static merge_info sorted_array_merge(std::vector<Cursor<Wrapped<R>>> &cursors, + Wrapped<R> *buffer, + psudb::BloomFilter<R> *bf = nullptr) { + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i = 0; i < cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } + + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = + pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[now.version]; + auto &cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + buffer[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. */ - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - /* skip over records that have been deleted via tagging */ - if (!cursor.ptr->is_deleted()) { - buffer[info.record_count++] = *cursor.ptr; - - /* - * if the record is a tombstone, increment the ts count and - * insert it into the bloom filter if one has been - * provided. - */ - if (cursor.ptr->is_tombstone()) { - info.tombstone_count++; - if (bf) { - bf->insert(cursor.ptr->rec); - } - } - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (bf) { + bf->insert(cursor.ptr->rec); + } } + } + pq.pop(); + + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); } + } - return info; + return info; } - - -} +} // namespace de |