diff options
| author | Douglas B. Rumbaugh <dbr4@psu.edu> | 2024-02-09 14:06:59 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-02-09 14:06:59 -0500 |
| commit | bc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30 (patch) | |
| tree | 66333c55feb0ea8875a50e6dc07c8535d241bf1c /include/util | |
| parent | 076e104b8672924c3d80cd1da2fdb5ebee1766ac (diff) | |
| parent | 46885246313358a3b606eca139b20280e96db10e (diff) | |
| download | dynamic-extension-bc0f3cca3a5b495fcae1d3ad8d09e6d714da5d30.tar.gz | |
Merge pull request #1 from dbrumbaugh/new-buffer
Initial Concurrency Implementation
Diffstat (limited to 'include/util')
| -rw-r--r-- | include/util/Cursor.h | 21 | ||||
| -rw-r--r-- | include/util/SortedMerge.h | 205 | ||||
| -rw-r--r-- | include/util/bf_config.h | 23 | ||||
| -rw-r--r-- | include/util/types.h | 59 |
4 files changed, 270 insertions, 38 deletions
diff --git a/include/util/Cursor.h b/include/util/Cursor.h index 1b0b8ed..e8ba53d 100644 --- a/include/util/Cursor.h +++ b/include/util/Cursor.h @@ -1,19 +1,24 @@ /* * include/util/Cursor.h * - * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> * Dong Xie <dongx@psu.edu> * - * All rights reserved. Published under the Modified BSD License. + * Distributed under the Modified BSD License. * + * A simple record cursor type with associated methods for help in + * merging record sets when constructing shards. Iterates an array + * of records in order, and provides facilities to make sorted merges + * easier. + * + * TODO: Prior versions of this module included automatic support for + * working with data stored in PagedFiles as well. That should be + * reintroduced at some point. */ #pragma once -#include "framework/RecordInterface.h" - -#include "psu-ds/BloomFilter.h" -#include "psu-ds/PriorityQueue.h" -#include "psu-util/alignment.h" +#include <cstdlib> +#include <vector> namespace de { template<typename R> @@ -62,6 +67,8 @@ template <typename R> inline static Cursor<R> *get_next(std::vector<Cursor<R>> &cursors, Cursor<R> *current=nullptr) { const R *min_rec = nullptr; Cursor<R> *result = nullptr; + // FIXME: for large cursor vectors, it may be worth it to use a + // PriorityQueue here instead of scanning. for (size_t i=0; i< cursors.size(); i++) { if (cursors[i] == (Cursor<R>) {0} ) continue; diff --git a/include/util/SortedMerge.h b/include/util/SortedMerge.h new file mode 100644 index 0000000..8a1e782 --- /dev/null +++ b/include/util/SortedMerge.h @@ -0,0 +1,205 @@ +/* + * include/util/SortedMerge.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * A sorted array merge routine for use in Shard construction, as many + * shards will use a sorted array to represent their data. Also encapsulates + * the necessary tombstone-cancellation logic. + * + * FIXME: include generic per-record processing functionality for Shards that + * need it, to avoid needing to reprocess the array in the shard after + * creation. + */ +#pragma once + +#include "util/Cursor.h" +#include "framework/interface/Shard.h" +#include "psu-ds/PriorityQueue.h" + +namespace de { + +using psudb::PriorityQueue; +using psudb::BloomFilter; +using psudb::queue_record; +using psudb::byte; +using psudb::CACHELINE_SIZE; + +/* + * A simple struct to return record_count and tombstone_count information + * back to the caller. Could've been an std::pair, but I like the more + * explicit names. + */ +struct merge_info { + size_t record_count; + size_t tombstone_count; +}; + +/* + * Build a vector of cursors corresponding to the records contained within + * a vector of shards. The cursor at index i in the output will correspond + * to the shard at index i in the input. + * + * The values of reccnt and tscnt will be updated with the sum of the + * records contained within the shards. Note that these counts include deleted + * records that may be removed during shard construction, and so constitute + * upper bounds only. + */ +template <RecordInterface R, ShardInterface<R> S> +static std::vector<Cursor<Wrapped<R>>> build_cursor_vec(std::vector<S*> &shards, size_t *reccnt, size_t *tscnt) { + std::vector<Cursor<Wrapped<R>>> cursors; + cursors.reserve(shards.size()); + + *reccnt = 0; + *tscnt = 0; + + for (size_t i = 0; i < shards.size(); ++i) { + if (shards[i]) { + auto base = shards[i]->get_data(); + cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); + *reccnt += shards[i]->get_record_count(); + *tscnt += shards[i]->get_tombstone_count(); + } else { + cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); + } + } + + return cursors; +} + +/* + * Build a sorted array of records based on the contents of a BufferView. + * This routine does not alter the buffer view, but rather copies the + * records out and then sorts them. The provided buffer must be large + * enough to store the records from the BufferView, or the behavior of the + * function is undefined. + * + * It allocates a temporary buffer for the sorting, and execution of the + * program will be aborted if the allocation fails. + */ +template <RecordInterface R> +static merge_info sorted_array_from_bufferview(BufferView<R> bv, + Wrapped<R> *buffer, + psudb::BloomFilter<R> *bf=nullptr) { + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + bv.get_record_count(), + sizeof(Wrapped<R>)); + bv.copy_to_buffer((byte *) temp_buffer); + + auto base = temp_buffer; + auto stop = base + bv.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + merge_info info = {0, 0}; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) + && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; + } + + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + buffer[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (bf){ + bf->insert(base->rec); + } + } + + base++; + } + + free(temp_buffer); + return info; +} + +/* + * Perform a sorted merge of the records within cursors into the provided + * buffer. Includes tombstone and tagged delete cancellation logic, and + * will insert tombstones into a bloom filter, if one is provided. + * + * The behavior of this function is undefined if the provided buffer does + * not have space to contain all of the records within the input cursors. + */ +template <RecordInterface R> +static merge_info sorted_array_merge(std::vector<Cursor<Wrapped<R>>> &cursors, + Wrapped<R> *buffer, + psudb::BloomFilter<R> *bf=nullptr) { + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i=0; i<cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } + + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + buffer[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (bf) { + bf->insert(cursor.ptr->rec); + } + } + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + return info; +} + + + +} diff --git a/include/util/bf_config.h b/include/util/bf_config.h index 2390643..9f29ed7 100644 --- a/include/util/bf_config.h +++ b/include/util/bf_config.h @@ -1,25 +1,42 @@ /* * include/util/bf_config.h * - * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> * Dong Xie <dongx@psu.edu> * - * All rights reserved. Published under the Modified BSD License. + * Distributed under the Modified BSD License. + * + * Global parameters for configuring bloom filters used as auxiliary + * structures on shards within the framework. The bloom filter class + * can be found in + * + * $PROJECT_ROOT/external/psudb-common/cpp/include/psu-ds/BloomFilter.h * */ #pragma once -#include "psu-util/alignment.h" +#include <cstdlib> namespace de { +/* global variable for specifying bloom filter FPR */ static double BF_FPR = .01; + +/* global variable for specifying number of BF hash functions (k) */ static size_t BF_HASH_FUNCS = 7; +/* + * Adjust the value of BF_FPR. The argument must be on the interval + * (0, 1), or the behavior of bloom filters is undefined. + */ static void BF_SET_FPR(double fpr) { BF_FPR = fpr; } +/* + * Adjust the value of BF_HASH_FUNCS. The argument must be on the interval + * (0, INT64_MAX], or the behavior of bloom filters is undefined. + */ static void BF_SET_HASHFUNC(size_t func_cnt) { BF_HASH_FUNCS = func_cnt; } diff --git a/include/util/types.h b/include/util/types.h index 3010e78..a13bd95 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -1,54 +1,62 @@ /* * include/util/types.h * - * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> * - * All rights reserved. Published under the Modified BSD License. + * Distributed under the Modified BSD License. * - * A centralized header file for various datatypes used throughout the + * A centralized header file for various data types used throughout the * code base. There are a few very specific types, such as header formats, * that are defined within the header files that make direct use of them, * but all generally usable, simple types are defined here. * + * Many of these types were used in the Practical Dynamic Extension for + * Sampling Indexes work, particularly for external storage and buffer + * pool systems. They aren't used now, but we're leaving them here to use + * them in the future, when we add this functionality into this system too. */ #pragma once -#include <cstdlib> #include <cstdint> -#include <cstddef> -#include <string> +#include <cstdlib> namespace de { -using std::byte; - -// Represents a page offset within a specific file (physical or virtual) +/* Represents a page offset within a specific file (physical or virtual) */ typedef uint32_t PageNum; -// Byte offset within a page. Also used for lengths of records, etc., -// within the codebase. size_t isn't necessary, as the maximum offset -// is only parm::PAGE_SIZE +/* + * Byte offset within a page. Also used for lengths of records, etc., + * within the codebase. size_t isn't necessary, as the maximum offset + * is only parm::PAGE_SIZE + */ typedef uint16_t PageOffset; -// A unique identifier for a frame within a buffer or cache. +/* A unique identifier for a frame within a buffer or cache */ typedef int32_t FrameId; -// A unique timestamp for use in MVCC concurrency control. Currently stored in -// record headers, but not used by anything. +/* + * A unique timestamp for use in MVCC concurrency control. Currently stored in + * record headers, but not used by anything. + */ typedef uint32_t Timestamp; const Timestamp TIMESTAMP_MIN = 0; const Timestamp TIMESTAMP_MAX = UINT32_MAX; -// Invalid values for various IDs. Used throughout the code base to indicate -// uninitialized values and error conditions. +/* + * Invalid values for various IDs. Used throughout the code base to indicate + * uninitialized values and error conditions. + */ const PageNum INVALID_PNUM = 0; const FrameId INVALID_FRID = -1; -// An ID for a given shard within the index. The level_idx is the index -// in the memory_levels and disk_levels vectors corresponding to the -// shard, and the shard_idx is the index with the level (always 0 in the -// case of leveling). Note that the two vectors of levels are treated -// as a contiguous index space. +/* + * An ID for a given shard within the index. The level_idx is the index + * in the memory_levels and disk_levels vectors corresponding to the + * shard, and the shard_idx is the index with the level (always 0 in the + * case of leveling). Note that the two vectors of levels are treated + * as a contiguous index space. + */ struct ShardID { ssize_t level_idx; ssize_t shard_idx; @@ -58,12 +66,7 @@ struct ShardID { } }; +/* A placeholder for an invalid shard--also used to indicate the mutable buffer */ const ShardID INVALID_SHID = {-1, -1}; -struct SampleRange { - ShardID shid; - size_t low; - size_t high; -}; - } |