diff options
Diffstat (limited to 'include/util')
| -rw-r--r-- | include/util/Cursor.h | 89 | ||||
| -rw-r--r-- | include/util/SortedMerge.h | 274 | ||||
| -rw-r--r-- | include/util/bf_config.h | 16 | ||||
| -rw-r--r-- | include/util/types.h | 119 |
4 files changed, 250 insertions, 248 deletions
diff --git a/include/util/Cursor.h b/include/util/Cursor.h index e8ba53d..e7963b1 100644 --- a/include/util/Cursor.h +++ b/include/util/Cursor.h @@ -1,8 +1,8 @@ /* * include/util/Cursor.h * - * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> - * Dong Xie <dongx@psu.edu> + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> * * Distributed under the Modified BSD License. * @@ -21,16 +21,15 @@ #include <vector> namespace de { -template<typename R> -struct Cursor { - R *ptr; - R *end; - size_t cur_rec_idx; - size_t rec_cnt; +template <typename R> struct Cursor { + const R *ptr; + const R *end; + size_t cur_rec_idx; + size_t rec_cnt; - friend bool operator==(const Cursor &a, const Cursor &b) { - return a.ptr == b.ptr && a.end == b.end; - } + friend bool operator==(const Cursor &a, const Cursor &b) { + return a.ptr == b.ptr && a.end == b.end; + } }; /* @@ -43,51 +42,55 @@ struct Cursor { * be updated to be equal to end, and false will be returned. Iterators will * not be closed. */ -template<typename R> -inline static bool advance_cursor(Cursor<R> &cur) { - cur.ptr++; - cur.cur_rec_idx++; +template <typename R> inline static bool advance_cursor(Cursor<R> &cur) { + cur.ptr++; + cur.cur_rec_idx++; - if (cur.cur_rec_idx >= cur.rec_cnt) return false; + if (cur.cur_rec_idx >= cur.rec_cnt) + return false; - if (cur.ptr >= cur.end) { - return false; - } - return true; + if (cur.ptr >= cur.end) { + return false; + } + return true; } /* * Process the list of cursors to return the cursor containing the next * largest element. Does not advance any of the cursors. If current is - * specified, then skip the current head of that cursor during checking. - * This allows for "peaking" at the next largest element after the current + * specified, then skip the current head of that cursor during checking. + * This allows for "peaking" at the next largest element after the current * largest is processed. */ template <typename R> -inline static Cursor<R> *get_next(std::vector<Cursor<R>> &cursors, Cursor<R> *current=nullptr) { - const R *min_rec = nullptr; - Cursor<R> *result = nullptr; - // FIXME: for large cursor vectors, it may be worth it to use a - // PriorityQueue here instead of scanning. - for (size_t i=0; i< cursors.size(); i++) { - if (cursors[i] == (Cursor<R>) {0} ) continue; - - const R *rec = (&cursors[i] == current) ? cursors[i].ptr + 1 : cursors[i].ptr; - if (rec >= cursors[i].end) continue; +inline static Cursor<R> *get_next(std::vector<Cursor<R>> &cursors, + Cursor<R> *current = nullptr) { + const R *min_rec = nullptr; + Cursor<R> *result = nullptr; + // FIXME: for large cursor vectors, it may be worth it to use a + // PriorityQueue here instead of scanning. + for (size_t i = 0; i < cursors.size(); i++) { + if (cursors[i] == (Cursor<R>){0}) + continue; - if (min_rec == nullptr) { - result = &cursors[i]; - min_rec = rec; - continue; - } + const R *rec = + (&cursors[i] == current) ? cursors[i].ptr + 1 : cursors[i].ptr; + if (rec >= cursors[i].end) + continue; - if (*rec < *min_rec) { - result = &cursors[i]; - min_rec = rec; - } + if (min_rec == nullptr) { + result = &cursors[i]; + min_rec = rec; + continue; } - return result; -} + if (*rec < *min_rec) { + result = &cursors[i]; + min_rec = rec; + } + } + return result; } + +} // namespace de diff --git a/include/util/SortedMerge.h b/include/util/SortedMerge.h index c149189..b0a3215 100644 --- a/include/util/SortedMerge.h +++ b/include/util/SortedMerge.h @@ -1,72 +1,78 @@ /* * include/util/SortedMerge.h * - * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> * * Distributed under the Modified BSD License. * * A sorted array merge routine for use in Shard construction, as many - * shards will use a sorted array to represent their data. Also encapsulates + * shards will use a sorted array to represent their data. Also encapsulates * the necessary tombstone-cancellation logic. * - * FIXME: include generic per-record processing functionality for Shards that + * FIXME: include generic per-record processing functionality for Shards that * need it, to avoid needing to reprocess the array in the shard after * creation. */ #pragma once -#include "util/Cursor.h" +#include <algorithm> + #include "framework/interface/Shard.h" #include "psu-ds/PriorityQueue.h" +#include "util/Cursor.h" namespace de { -using psudb::PriorityQueue; using psudb::BloomFilter; -using psudb::queue_record; using psudb::byte; using psudb::CACHELINE_SIZE; +using psudb::PriorityQueue; +using psudb::queue_record; /* - * A simple struct to return record_count and tombstone_count information - * back to the caller. Could've been an std::pair, but I like the more + * A simple struct to return record_count and tombstone_count information + * back to the caller. Could've been an std::pair, but I like the more * explicit names. */ struct merge_info { - size_t record_count; - size_t tombstone_count; + size_t record_count; + size_t tombstone_count; }; /* * Build a vector of cursors corresponding to the records contained within * a vector of shards. The cursor at index i in the output will correspond - * to the shard at index i in the input. + * to the shard at index i in the input. * * The values of reccnt and tscnt will be updated with the sum of the * records contained within the shards. Note that these counts include deleted * records that may be removed during shard construction, and so constitute * upper bounds only. */ -template <RecordInterface R, ShardInterface<R> S> -static std::vector<Cursor<Wrapped<R>>> build_cursor_vec(std::vector<S*> &shards, size_t *reccnt, size_t *tscnt) { - std::vector<Cursor<Wrapped<R>>> cursors; - cursors.reserve(shards.size()); - - *reccnt = 0; - *tscnt = 0; - - for (size_t i = 0; i < shards.size(); ++i) { - if (shards[i]) { - auto base = shards[i]->get_data(); - cursors.emplace_back(Cursor<Wrapped<R>>{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); - *reccnt += shards[i]->get_record_count(); - *tscnt += shards[i]->get_tombstone_count(); - } else { - cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); - } +template <RecordInterface R, ShardInterface S> +static std::vector<Cursor<Wrapped<R>>> +build_cursor_vec(std::vector<S *> const &shards, size_t *reccnt, + size_t *tscnt) { + std::vector<Cursor<Wrapped<R>>> cursors; + cursors.reserve(shards.size()); + + *reccnt = 0; + *tscnt = 0; + + for (size_t i = 0; i < shards.size(); ++i) { + if (shards[i]) { + auto base = shards[i]->get_data(); + cursors.emplace_back( + Cursor<Wrapped<R>>{base, base + shards[i]->get_record_count(), 0, + shards[i]->get_record_count()}); + *reccnt += shards[i]->get_record_count(); + *tscnt += shards[i]->get_tombstone_count(); + } else { + cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); } + } - return cursors; + return cursors; } /* @@ -80,126 +86,128 @@ static std::vector<Cursor<Wrapped<R>>> build_cursor_vec(std::vector<S*> &shards, * program will be aborted if the allocation fails. */ template <RecordInterface R> -static merge_info sorted_array_from_bufferview(BufferView<R> bv, - Wrapped<R> *buffer, - psudb::BloomFilter<R> *bf=nullptr) { - /* - * Copy the contents of the buffer view into a temporary buffer, and - * sort them. We still need to iterate over these temporary records to - * apply tombstone/deleted record filtering, as well as any possible - * per-record processing that is required by the shard being built. - */ - auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, - bv.get_record_count(), - sizeof(Wrapped<R>)); - bv.copy_to_buffer((byte *) temp_buffer); - - auto base = temp_buffer; - auto stop = base + bv.get_record_count(); - std::sort(base, stop, std::less<Wrapped<R>>()); - - merge_info info = {0, 0}; - - /* - * Iterate over the temporary buffer to process the records, copying - * them into buffer as needed - */ - while (base < stop) { - if (!base->is_tombstone() && (base + 1 < stop) - && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - continue; - } else if (base->is_deleted()) { - base += 1; - continue; - } - - // FIXME: this shouldn't be necessary, but the tagged record - // bypass doesn't seem to be working on this code-path, so this - // ensures that tagged records from the buffer are able to be - // dropped, eventually. It should only need to be &= 1 - base->header &= 3; - buffer[info.record_count++] = *base; - - if (base->is_tombstone()) { - info.tombstone_count++; - if (bf){ - bf->insert(base->rec); - } - } +static merge_info +sorted_array_from_bufferview(BufferView<R> bv, Wrapped<R> *buffer, + psudb::BloomFilter<R> *bf = nullptr) { + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped<R> *)psudb::sf_aligned_calloc( + CACHELINE_SIZE, bv.get_record_count(), sizeof(Wrapped<R>)); + bv.copy_to_buffer((byte *)temp_buffer); + + auto base = temp_buffer; + auto stop = base + bv.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + merge_info info = {0, 0}; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) && + base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; + } - base++; + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + buffer[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (bf) { + bf->insert(base->rec); + } } - free(temp_buffer); - return info; + base++; + } + + free(temp_buffer); + return info; } /* * Perform a sorted merge of the records within cursors into the provided * buffer. Includes tombstone and tagged delete cancellation logic, and - * will insert tombstones into a bloom filter, if one is provided. + * will insert tombstones into a bloom filter, if one is provided. * * The behavior of this function is undefined if the provided buffer does * not have space to contain all of the records within the input cursors. */ template <RecordInterface R> -static merge_info sorted_array_merge(std::vector<Cursor<Wrapped<R>>> &cursors, - Wrapped<R> *buffer, - psudb::BloomFilter<R> *bf=nullptr) { - - // FIXME: For smaller cursor arrays, it may be more efficient to skip - // the priority queue and just do a scan. - PriorityQueue<Wrapped<R>> pq(cursors.size()); - for (size_t i=0; i<cursors.size(); i++) { - pq.push(cursors[i].ptr, i); - } - - merge_info info = {0, 0}; - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; - /* - * if the current record is not a tombstone, and the next record is - * a tombstone that matches the current one, then the current one - * has been deleted, and both it and its tombstone can be skipped - * over. +static merge_info sorted_array_merge(std::vector<Cursor<Wrapped<R>>> &cursors, + Wrapped<R> *buffer, + psudb::BloomFilter<R> *bf = nullptr) { + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i = 0; i < cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } + + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = + pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[now.version]; + auto &cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + buffer[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. */ - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - /* skip over records that have been deleted via tagging */ - if (!cursor.ptr->is_deleted()) { - buffer[info.record_count++] = *cursor.ptr; - - /* - * if the record is a tombstone, increment the ts count and - * insert it into the bloom filter if one has been - * provided. - */ - if (cursor.ptr->is_tombstone()) { - info.tombstone_count++; - if (bf) { - bf->insert(cursor.ptr->rec); - } - } - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (bf) { + bf->insert(cursor.ptr->rec); + } } + } + pq.pop(); + + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); } + } - return info; + return info; } - - -} +} // namespace de diff --git a/include/util/bf_config.h b/include/util/bf_config.h index 9f29ed7..836e452 100644 --- a/include/util/bf_config.h +++ b/include/util/bf_config.h @@ -1,8 +1,8 @@ /* * include/util/bf_config.h * - * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> - * Dong Xie <dongx@psu.edu> + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> * * Distributed under the Modified BSD License. * @@ -26,19 +26,15 @@ static double BF_FPR = .01; static size_t BF_HASH_FUNCS = 7; /* - * Adjust the value of BF_FPR. The argument must be on the interval + * Adjust the value of BF_FPR. The argument must be on the interval * (0, 1), or the behavior of bloom filters is undefined. */ -static void BF_SET_FPR(double fpr) { - BF_FPR = fpr; -} +[[maybe_unused]] static void BF_SET_FPR(double fpr) { BF_FPR = fpr; } /* * Adjust the value of BF_HASH_FUNCS. The argument must be on the interval * (0, INT64_MAX], or the behavior of bloom filters is undefined. */ -static void BF_SET_HASHFUNC(size_t func_cnt) { - BF_HASH_FUNCS = func_cnt; -} +[[maybe_unused]] static void BF_SET_HASHFUNC(size_t func_cnt) { BF_HASH_FUNCS = func_cnt; } -} +} // namespace de diff --git a/include/util/types.h b/include/util/types.h index cf61412..b8a1343 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -1,7 +1,7 @@ /* * include/util/types.h * - * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> * * Distributed under the Modified BSD License. * @@ -17,10 +17,10 @@ */ #pragma once +#include <cassert> #include <cstdint> #include <cstdlib> #include <vector> -#include <cassert> namespace de { @@ -30,14 +30,14 @@ typedef uint32_t PageNum; /* * Byte offset within a page. Also used for lengths of records, etc., * within the codebase. size_t isn't necessary, as the maximum offset - * is only parm::PAGE_SIZE + * is only parm::PAGE_SIZE */ typedef uint16_t PageOffset; /* A unique identifier for a frame within a buffer or cache */ typedef int32_t FrameId; -/* +/* * A unique timestamp for use in MVCC concurrency control. Currently stored in * record headers, but not used by anything. */ @@ -45,7 +45,7 @@ typedef uint32_t Timestamp; const Timestamp TIMESTAMP_MIN = 0; const Timestamp TIMESTAMP_MAX = UINT32_MAX; -/* +/* * Invalid values for various IDs. Used throughout the code base to indicate * uninitialized values and error conditions. */ @@ -60,90 +60,85 @@ const FrameId INVALID_FRID = -1; * as a contiguous index space. */ struct ShardID { - ssize_t level_idx; - ssize_t shard_idx; + ssize_t level_idx; + ssize_t shard_idx; - friend bool operator==(const ShardID &shid1, const ShardID &shid2) { - return shid1.level_idx == shid2.level_idx && shid1.shard_idx == shid2.shard_idx; - } + friend bool operator==(const ShardID &shid1, const ShardID &shid2) { + return shid1.level_idx == shid2.level_idx && + shid1.shard_idx == shid2.shard_idx; + } }; -/* A placeholder for an invalid shard--also used to indicate the mutable buffer */ +/* + * A placeholder for an invalid shard--also used to indicate the + * mutable buffer + */ const ShardID INVALID_SHID = {-1, -1}; typedef ssize_t level_index; typedef struct ReconstructionTask { - std::vector<level_index> sources; - level_index target; - size_t reccnt; + std::vector<level_index> sources; + level_index target; + size_t reccnt; - void add_source(level_index source, size_t cnt) { - sources.push_back(source); - reccnt += cnt; - } + void add_source(level_index source, size_t cnt) { + sources.push_back(source); + reccnt += cnt; + } } ReconstructionTask; class ReconstructionVector { public: - ReconstructionVector() - : total_reccnt(0) {} + ReconstructionVector() : total_reccnt(0) {} - ~ReconstructionVector() = default; + ~ReconstructionVector() = default; - ReconstructionTask operator[](size_t idx) { - return m_tasks[idx]; - } + ReconstructionTask operator[](size_t idx) { return m_tasks[idx]; } - void add_reconstruction(level_index source, level_index target, size_t reccnt) { - m_tasks.push_back({{source}, target, reccnt}); - total_reccnt += reccnt; - } + void add_reconstruction(level_index source, level_index target, + size_t reccnt) { + m_tasks.push_back({{source}, target, reccnt}); + total_reccnt += reccnt; + } - void add_reconstruction(ReconstructionTask task) { - m_tasks.push_back(task); - } + void add_reconstruction(ReconstructionTask task) { m_tasks.push_back(task); } - ReconstructionTask remove_reconstruction(size_t idx) { - assert(idx < m_tasks.size()); - auto task = m_tasks[idx]; + ReconstructionTask remove_reconstruction(size_t idx) { + assert(idx < m_tasks.size()); + auto task = m_tasks[idx]; - m_tasks.erase(m_tasks.begin() + idx); - total_reccnt -= task.reccnt; + m_tasks.erase(m_tasks.begin() + idx); + total_reccnt -= task.reccnt; - return task; - } + return task; + } - ReconstructionTask remove_smallest_reconstruction() { - size_t min_size = m_tasks[0].reccnt; - size_t idx = 0; - for (size_t i=1; i<m_tasks.size(); i++) { - if (m_tasks[i].reccnt < min_size) { - min_size = m_tasks[i].reccnt; - idx = i; - } - } - - auto task = m_tasks[idx]; - m_tasks.erase(m_tasks.begin() + idx); - total_reccnt -= task.reccnt; - - return task; + ReconstructionTask remove_smallest_reconstruction() { + size_t min_size = m_tasks[0].reccnt; + size_t idx = 0; + for (size_t i = 1; i < m_tasks.size(); i++) { + if (m_tasks[i].reccnt < min_size) { + min_size = m_tasks[i].reccnt; + idx = i; + } } - size_t get_total_reccnt() { - return total_reccnt; - } + auto task = m_tasks[idx]; + m_tasks.erase(m_tasks.begin() + idx); + total_reccnt -= task.reccnt; - size_t size() { - return m_tasks.size(); - } + return task; + } + + size_t get_total_reccnt() { return total_reccnt; } + size_t size() { return m_tasks.size(); } private: - std::vector<ReconstructionTask> m_tasks; - size_t total_reccnt; + std::vector<ReconstructionTask> m_tasks; + size_t total_reccnt; }; -} +} // namespace de |