From 9fe305c7d28e993e55c55427f377ae7e3251ea4f Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Fri, 6 Dec 2024 13:13:51 -0500 Subject: Interface update (#5) * Query Interface Adjustments/Refactoring Began the process of adjusting the query interface (and also the shard interface, to a lesser degree) to better accommodate the user. In particular the following changes have been made, 1. The number of necessary template arguments for the query type has been drastically reduced, while also removing the void pointers and manual delete functions from the interface. This was accomplished by requiring many of the sub-types associated with a query (parameters, etc.) to be nested inside the main query class, and by forcing the SHARD type to expose its associated record type. 2. User-defined query return types are now supported. Queries no longer are required to return strictly sets of records. Instead, the query now has LocalResultType and ResultType template parameters (which can be defaulted using a typedef in the Query type itself), allowing much more flexibility. Note that, at least for the short term, the LocalResultType must still expose the same is_deleted/is_tombstone interface as a Wrapped used to, as this is currently needed for delete filtering. A better approach to this is, hopefully, forthcoming. 3. Updated the ISAMTree.h shard and rangequery.h query to use the new interfaces, and adjusted the associated unit tests as well. 4. Dropped the unnecessary "get_data()" function from the ShardInterface concept. 5. Dropped the need to specify a record type in the ShardInterface concept. This is now handled using a required Shard::RECORD member of the Shard class itself, which should expose the name of the record type. * Updates to framework to support new Query/Shard interfaces Pretty extensive adjustments to the framework, particularly to the templates themselves, along with some type-renaming work, to support the new query and shard interfaces. Adjusted the external query interface to take an rvalue reference, rather than a pointer, to the query parameters. * Removed framework-level delete filtering This was causing some issues with the new query interface, and should probably be reworked anyway, so I'm temporarily (TM) removing the feature. * Updated benchmarks + remaining code for new interface --- include/util/SortedMerge.h | 274 +++++++++++++++++++++++---------------------- 1 file changed, 141 insertions(+), 133 deletions(-) (limited to 'include/util/SortedMerge.h') diff --git a/include/util/SortedMerge.h b/include/util/SortedMerge.h index c149189..b0a3215 100644 --- a/include/util/SortedMerge.h +++ b/include/util/SortedMerge.h @@ -1,72 +1,78 @@ /* * include/util/SortedMerge.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * * A sorted array merge routine for use in Shard construction, as many - * shards will use a sorted array to represent their data. Also encapsulates + * shards will use a sorted array to represent their data. Also encapsulates * the necessary tombstone-cancellation logic. * - * FIXME: include generic per-record processing functionality for Shards that + * FIXME: include generic per-record processing functionality for Shards that * need it, to avoid needing to reprocess the array in the shard after * creation. */ #pragma once -#include "util/Cursor.h" +#include + #include "framework/interface/Shard.h" #include "psu-ds/PriorityQueue.h" +#include "util/Cursor.h" namespace de { -using psudb::PriorityQueue; using psudb::BloomFilter; -using psudb::queue_record; using psudb::byte; using psudb::CACHELINE_SIZE; +using psudb::PriorityQueue; +using psudb::queue_record; /* - * A simple struct to return record_count and tombstone_count information - * back to the caller. Could've been an std::pair, but I like the more + * A simple struct to return record_count and tombstone_count information + * back to the caller. Could've been an std::pair, but I like the more * explicit names. */ struct merge_info { - size_t record_count; - size_t tombstone_count; + size_t record_count; + size_t tombstone_count; }; /* * Build a vector of cursors corresponding to the records contained within * a vector of shards. The cursor at index i in the output will correspond - * to the shard at index i in the input. + * to the shard at index i in the input. * * The values of reccnt and tscnt will be updated with the sum of the * records contained within the shards. Note that these counts include deleted * records that may be removed during shard construction, and so constitute * upper bounds only. */ -template S> -static std::vector>> build_cursor_vec(std::vector &shards, size_t *reccnt, size_t *tscnt) { - std::vector>> cursors; - cursors.reserve(shards.size()); - - *reccnt = 0; - *tscnt = 0; - - for (size_t i = 0; i < shards.size(); ++i) { - if (shards[i]) { - auto base = shards[i]->get_data(); - cursors.emplace_back(Cursor>{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); - *reccnt += shards[i]->get_record_count(); - *tscnt += shards[i]->get_tombstone_count(); - } else { - cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); - } +template +static std::vector>> +build_cursor_vec(std::vector const &shards, size_t *reccnt, + size_t *tscnt) { + std::vector>> cursors; + cursors.reserve(shards.size()); + + *reccnt = 0; + *tscnt = 0; + + for (size_t i = 0; i < shards.size(); ++i) { + if (shards[i]) { + auto base = shards[i]->get_data(); + cursors.emplace_back( + Cursor>{base, base + shards[i]->get_record_count(), 0, + shards[i]->get_record_count()}); + *reccnt += shards[i]->get_record_count(); + *tscnt += shards[i]->get_tombstone_count(); + } else { + cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); } + } - return cursors; + return cursors; } /* @@ -80,126 +86,128 @@ static std::vector>> build_cursor_vec(std::vector &shards, * program will be aborted if the allocation fails. */ template -static merge_info sorted_array_from_bufferview(BufferView bv, - Wrapped *buffer, - psudb::BloomFilter *bf=nullptr) { - /* - * Copy the contents of the buffer view into a temporary buffer, and - * sort them. We still need to iterate over these temporary records to - * apply tombstone/deleted record filtering, as well as any possible - * per-record processing that is required by the shard being built. - */ - auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, - bv.get_record_count(), - sizeof(Wrapped)); - bv.copy_to_buffer((byte *) temp_buffer); - - auto base = temp_buffer; - auto stop = base + bv.get_record_count(); - std::sort(base, stop, std::less>()); - - merge_info info = {0, 0}; - - /* - * Iterate over the temporary buffer to process the records, copying - * them into buffer as needed - */ - while (base < stop) { - if (!base->is_tombstone() && (base + 1 < stop) - && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - continue; - } else if (base->is_deleted()) { - base += 1; - continue; - } - - // FIXME: this shouldn't be necessary, but the tagged record - // bypass doesn't seem to be working on this code-path, so this - // ensures that tagged records from the buffer are able to be - // dropped, eventually. It should only need to be &= 1 - base->header &= 3; - buffer[info.record_count++] = *base; - - if (base->is_tombstone()) { - info.tombstone_count++; - if (bf){ - bf->insert(base->rec); - } - } +static merge_info +sorted_array_from_bufferview(BufferView bv, Wrapped *buffer, + psudb::BloomFilter *bf = nullptr) { + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped *)psudb::sf_aligned_calloc( + CACHELINE_SIZE, bv.get_record_count(), sizeof(Wrapped)); + bv.copy_to_buffer((byte *)temp_buffer); + + auto base = temp_buffer; + auto stop = base + bv.get_record_count(); + std::sort(base, stop, std::less>()); + + merge_info info = {0, 0}; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) && + base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; + } - base++; + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + buffer[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (bf) { + bf->insert(base->rec); + } } - free(temp_buffer); - return info; + base++; + } + + free(temp_buffer); + return info; } /* * Perform a sorted merge of the records within cursors into the provided * buffer. Includes tombstone and tagged delete cancellation logic, and - * will insert tombstones into a bloom filter, if one is provided. + * will insert tombstones into a bloom filter, if one is provided. * * The behavior of this function is undefined if the provided buffer does * not have space to contain all of the records within the input cursors. */ template -static merge_info sorted_array_merge(std::vector>> &cursors, - Wrapped *buffer, - psudb::BloomFilter *bf=nullptr) { - - // FIXME: For smaller cursor arrays, it may be more efficient to skip - // the priority queue and just do a scan. - PriorityQueue> pq(cursors.size()); - for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; - /* - * if the current record is not a tombstone, and the next record is - * a tombstone that matches the current one, then the current one - * has been deleted, and both it and its tombstone can be skipped - * over. +static merge_info sorted_array_merge(std::vector>> &cursors, + Wrapped *buffer, + psudb::BloomFilter *bf = nullptr) { + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i = 0; i < cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } + + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = + pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[now.version]; + auto &cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + buffer[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. */ - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - /* skip over records that have been deleted via tagging */ - if (!cursor.ptr->is_deleted()) { - buffer[info.record_count++] = *cursor.ptr; - - /* - * if the record is a tombstone, increment the ts count and - * insert it into the bloom filter if one has been - * provided. - */ - if (cursor.ptr->is_tombstone()) { - info.tombstone_count++; - if (bf) { - bf->insert(cursor.ptr->rec); - } - } - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (bf) { + bf->insert(cursor.ptr->rec); + } } + } + pq.pop(); + + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); } + } - return info; + return info; } - - -} +} // namespace de -- cgit v1.2.3