summaryrefslogtreecommitdiffstats
path: root/include/util
diff options
context:
space:
mode:
Diffstat (limited to 'include/util')
-rw-r--r--include/util/Cursor.h89
-rw-r--r--include/util/SortedMerge.h274
-rw-r--r--include/util/bf_config.h16
-rw-r--r--include/util/types.h119
4 files changed, 250 insertions, 248 deletions
diff --git a/include/util/Cursor.h b/include/util/Cursor.h
index e8ba53d..e7963b1 100644
--- a/include/util/Cursor.h
+++ b/include/util/Cursor.h
@@ -1,8 +1,8 @@
/*
* include/util/Cursor.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -21,16 +21,15 @@
#include <vector>
namespace de {
-template<typename R>
-struct Cursor {
- R *ptr;
- R *end;
- size_t cur_rec_idx;
- size_t rec_cnt;
+template <typename R> struct Cursor {
+ const R *ptr;
+ const R *end;
+ size_t cur_rec_idx;
+ size_t rec_cnt;
- friend bool operator==(const Cursor &a, const Cursor &b) {
- return a.ptr == b.ptr && a.end == b.end;
- }
+ friend bool operator==(const Cursor &a, const Cursor &b) {
+ return a.ptr == b.ptr && a.end == b.end;
+ }
};
/*
@@ -43,51 +42,55 @@ struct Cursor {
* be updated to be equal to end, and false will be returned. Iterators will
* not be closed.
*/
-template<typename R>
-inline static bool advance_cursor(Cursor<R> &cur) {
- cur.ptr++;
- cur.cur_rec_idx++;
+template <typename R> inline static bool advance_cursor(Cursor<R> &cur) {
+ cur.ptr++;
+ cur.cur_rec_idx++;
- if (cur.cur_rec_idx >= cur.rec_cnt) return false;
+ if (cur.cur_rec_idx >= cur.rec_cnt)
+ return false;
- if (cur.ptr >= cur.end) {
- return false;
- }
- return true;
+ if (cur.ptr >= cur.end) {
+ return false;
+ }
+ return true;
}
/*
* Process the list of cursors to return the cursor containing the next
* largest element. Does not advance any of the cursors. If current is
- * specified, then skip the current head of that cursor during checking.
- * This allows for "peaking" at the next largest element after the current
+ * specified, then skip the current head of that cursor during checking.
+ * This allows for "peaking" at the next largest element after the current
* largest is processed.
*/
template <typename R>
-inline static Cursor<R> *get_next(std::vector<Cursor<R>> &cursors, Cursor<R> *current=nullptr) {
- const R *min_rec = nullptr;
- Cursor<R> *result = nullptr;
- // FIXME: for large cursor vectors, it may be worth it to use a
- // PriorityQueue here instead of scanning.
- for (size_t i=0; i< cursors.size(); i++) {
- if (cursors[i] == (Cursor<R>) {0} ) continue;
-
- const R *rec = (&cursors[i] == current) ? cursors[i].ptr + 1 : cursors[i].ptr;
- if (rec >= cursors[i].end) continue;
+inline static Cursor<R> *get_next(std::vector<Cursor<R>> &cursors,
+ Cursor<R> *current = nullptr) {
+ const R *min_rec = nullptr;
+ Cursor<R> *result = nullptr;
+ // FIXME: for large cursor vectors, it may be worth it to use a
+ // PriorityQueue here instead of scanning.
+ for (size_t i = 0; i < cursors.size(); i++) {
+ if (cursors[i] == (Cursor<R>){0})
+ continue;
- if (min_rec == nullptr) {
- result = &cursors[i];
- min_rec = rec;
- continue;
- }
+ const R *rec =
+ (&cursors[i] == current) ? cursors[i].ptr + 1 : cursors[i].ptr;
+ if (rec >= cursors[i].end)
+ continue;
- if (*rec < *min_rec) {
- result = &cursors[i];
- min_rec = rec;
- }
+ if (min_rec == nullptr) {
+ result = &cursors[i];
+ min_rec = rec;
+ continue;
}
- return result;
-}
+ if (*rec < *min_rec) {
+ result = &cursors[i];
+ min_rec = rec;
+ }
+ }
+ return result;
}
+
+} // namespace de
diff --git a/include/util/SortedMerge.h b/include/util/SortedMerge.h
index c149189..b0a3215 100644
--- a/include/util/SortedMerge.h
+++ b/include/util/SortedMerge.h
@@ -1,72 +1,78 @@
/*
* include/util/SortedMerge.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
* A sorted array merge routine for use in Shard construction, as many
- * shards will use a sorted array to represent their data. Also encapsulates
+ * shards will use a sorted array to represent their data. Also encapsulates
* the necessary tombstone-cancellation logic.
*
- * FIXME: include generic per-record processing functionality for Shards that
+ * FIXME: include generic per-record processing functionality for Shards that
* need it, to avoid needing to reprocess the array in the shard after
* creation.
*/
#pragma once
-#include "util/Cursor.h"
+#include <algorithm>
+
#include "framework/interface/Shard.h"
#include "psu-ds/PriorityQueue.h"
+#include "util/Cursor.h"
namespace de {
-using psudb::PriorityQueue;
using psudb::BloomFilter;
-using psudb::queue_record;
using psudb::byte;
using psudb::CACHELINE_SIZE;
+using psudb::PriorityQueue;
+using psudb::queue_record;
/*
- * A simple struct to return record_count and tombstone_count information
- * back to the caller. Could've been an std::pair, but I like the more
+ * A simple struct to return record_count and tombstone_count information
+ * back to the caller. Could've been an std::pair, but I like the more
* explicit names.
*/
struct merge_info {
- size_t record_count;
- size_t tombstone_count;
+ size_t record_count;
+ size_t tombstone_count;
};
/*
* Build a vector of cursors corresponding to the records contained within
* a vector of shards. The cursor at index i in the output will correspond
- * to the shard at index i in the input.
+ * to the shard at index i in the input.
*
* The values of reccnt and tscnt will be updated with the sum of the
* records contained within the shards. Note that these counts include deleted
* records that may be removed during shard construction, and so constitute
* upper bounds only.
*/
-template <RecordInterface R, ShardInterface<R> S>
-static std::vector<Cursor<Wrapped<R>>> build_cursor_vec(std::vector<S*> &shards, size_t *reccnt, size_t *tscnt) {
- std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(shards.size());
-
- *reccnt = 0;
- *tscnt = 0;
-
- for (size_t i = 0; i < shards.size(); ++i) {
- if (shards[i]) {
- auto base = shards[i]->get_data();
- cursors.emplace_back(Cursor<Wrapped<R>>{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
- *reccnt += shards[i]->get_record_count();
- *tscnt += shards[i]->get_tombstone_count();
- } else {
- cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
- }
+template <RecordInterface R, ShardInterface S>
+static std::vector<Cursor<Wrapped<R>>>
+build_cursor_vec(std::vector<S *> const &shards, size_t *reccnt,
+ size_t *tscnt) {
+ std::vector<Cursor<Wrapped<R>>> cursors;
+ cursors.reserve(shards.size());
+
+ *reccnt = 0;
+ *tscnt = 0;
+
+ for (size_t i = 0; i < shards.size(); ++i) {
+ if (shards[i]) {
+ auto base = shards[i]->get_data();
+ cursors.emplace_back(
+ Cursor<Wrapped<R>>{base, base + shards[i]->get_record_count(), 0,
+ shards[i]->get_record_count()});
+ *reccnt += shards[i]->get_record_count();
+ *tscnt += shards[i]->get_tombstone_count();
+ } else {
+ cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
}
+ }
- return cursors;
+ return cursors;
}
/*
@@ -80,126 +86,128 @@ static std::vector<Cursor<Wrapped<R>>> build_cursor_vec(std::vector<S*> &shards,
* program will be aborted if the allocation fails.
*/
template <RecordInterface R>
-static merge_info sorted_array_from_bufferview(BufferView<R> bv,
- Wrapped<R> *buffer,
- psudb::BloomFilter<R> *bf=nullptr) {
- /*
- * Copy the contents of the buffer view into a temporary buffer, and
- * sort them. We still need to iterate over these temporary records to
- * apply tombstone/deleted record filtering, as well as any possible
- * per-record processing that is required by the shard being built.
- */
- auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE,
- bv.get_record_count(),
- sizeof(Wrapped<R>));
- bv.copy_to_buffer((byte *) temp_buffer);
-
- auto base = temp_buffer;
- auto stop = base + bv.get_record_count();
- std::sort(base, stop, std::less<Wrapped<R>>());
-
- merge_info info = {0, 0};
-
- /*
- * Iterate over the temporary buffer to process the records, copying
- * them into buffer as needed
- */
- while (base < stop) {
- if (!base->is_tombstone() && (base + 1 < stop)
- && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
- base += 2;
- continue;
- } else if (base->is_deleted()) {
- base += 1;
- continue;
- }
-
- // FIXME: this shouldn't be necessary, but the tagged record
- // bypass doesn't seem to be working on this code-path, so this
- // ensures that tagged records from the buffer are able to be
- // dropped, eventually. It should only need to be &= 1
- base->header &= 3;
- buffer[info.record_count++] = *base;
-
- if (base->is_tombstone()) {
- info.tombstone_count++;
- if (bf){
- bf->insert(base->rec);
- }
- }
+static merge_info
+sorted_array_from_bufferview(BufferView<R> bv, Wrapped<R> *buffer,
+ psudb::BloomFilter<R> *bf = nullptr) {
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = (Wrapped<R> *)psudb::sf_aligned_calloc(
+ CACHELINE_SIZE, bv.get_record_count(), sizeof(Wrapped<R>));
+ bv.copy_to_buffer((byte *)temp_buffer);
+
+ auto base = temp_buffer;
+ auto stop = base + bv.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ merge_info info = {0, 0};
+
+ /*
+ * Iterate over the temporary buffer to process the records, copying
+ * them into buffer as needed
+ */
+ while (base < stop) {
+ if (!base->is_tombstone() && (base + 1 < stop) &&
+ base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
+ base += 2;
+ continue;
+ } else if (base->is_deleted()) {
+ base += 1;
+ continue;
+ }
- base++;
+ // FIXME: this shouldn't be necessary, but the tagged record
+ // bypass doesn't seem to be working on this code-path, so this
+ // ensures that tagged records from the buffer are able to be
+ // dropped, eventually. It should only need to be &= 1
+ base->header &= 3;
+ buffer[info.record_count++] = *base;
+
+ if (base->is_tombstone()) {
+ info.tombstone_count++;
+ if (bf) {
+ bf->insert(base->rec);
+ }
}
- free(temp_buffer);
- return info;
+ base++;
+ }
+
+ free(temp_buffer);
+ return info;
}
/*
* Perform a sorted merge of the records within cursors into the provided
* buffer. Includes tombstone and tagged delete cancellation logic, and
- * will insert tombstones into a bloom filter, if one is provided.
+ * will insert tombstones into a bloom filter, if one is provided.
*
* The behavior of this function is undefined if the provided buffer does
* not have space to contain all of the records within the input cursors.
*/
template <RecordInterface R>
-static merge_info sorted_array_merge(std::vector<Cursor<Wrapped<R>>> &cursors,
- Wrapped<R> *buffer,
- psudb::BloomFilter<R> *bf=nullptr) {
-
- // FIXME: For smaller cursor arrays, it may be more efficient to skip
- // the priority queue and just do a scan.
- PriorityQueue<Wrapped<R>> pq(cursors.size());
- for (size_t i=0; i<cursors.size(); i++) {
- pq.push(cursors[i].ptr, i);
- }
-
- merge_info info = {0, 0};
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- /*
- * if the current record is not a tombstone, and the next record is
- * a tombstone that matches the current one, then the current one
- * has been deleted, and both it and its tombstone can be skipped
- * over.
+static merge_info sorted_array_merge(std::vector<Cursor<Wrapped<R>>> &cursors,
+ Wrapped<R> *buffer,
+ psudb::BloomFilter<R> *bf = nullptr) {
+
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i = 0; i < cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
+
+ merge_info info = {0, 0};
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next =
+ pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop();
+ pq.pop();
+ auto &cursor1 = cursors[now.version];
+ auto &cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1))
+ pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2))
+ pq.push(cursor2.ptr, next.version);
+ } else {
+ auto &cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted()) {
+ buffer[info.record_count++] = *cursor.ptr;
+
+ /*
+ * if the record is a tombstone, increment the ts count and
+ * insert it into the bloom filter if one has been
+ * provided.
*/
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- /* skip over records that have been deleted via tagging */
- if (!cursor.ptr->is_deleted()) {
- buffer[info.record_count++] = *cursor.ptr;
-
- /*
- * if the record is a tombstone, increment the ts count and
- * insert it into the bloom filter if one has been
- * provided.
- */
- if (cursor.ptr->is_tombstone()) {
- info.tombstone_count++;
- if (bf) {
- bf->insert(cursor.ptr->rec);
- }
- }
- }
- pq.pop();
-
- if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
+ if (cursor.ptr->is_tombstone()) {
+ info.tombstone_count++;
+ if (bf) {
+ bf->insert(cursor.ptr->rec);
+ }
}
+ }
+ pq.pop();
+
+ if (advance_cursor(cursor))
+ pq.push(cursor.ptr, now.version);
}
+ }
- return info;
+ return info;
}
-
-
-}
+} // namespace de
diff --git a/include/util/bf_config.h b/include/util/bf_config.h
index 9f29ed7..836e452 100644
--- a/include/util/bf_config.h
+++ b/include/util/bf_config.h
@@ -1,8 +1,8 @@
/*
* include/util/bf_config.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -26,19 +26,15 @@ static double BF_FPR = .01;
static size_t BF_HASH_FUNCS = 7;
/*
- * Adjust the value of BF_FPR. The argument must be on the interval
+ * Adjust the value of BF_FPR. The argument must be on the interval
* (0, 1), or the behavior of bloom filters is undefined.
*/
-static void BF_SET_FPR(double fpr) {
- BF_FPR = fpr;
-}
+[[maybe_unused]] static void BF_SET_FPR(double fpr) { BF_FPR = fpr; }
/*
* Adjust the value of BF_HASH_FUNCS. The argument must be on the interval
* (0, INT64_MAX], or the behavior of bloom filters is undefined.
*/
-static void BF_SET_HASHFUNC(size_t func_cnt) {
- BF_HASH_FUNCS = func_cnt;
-}
+[[maybe_unused]] static void BF_SET_HASHFUNC(size_t func_cnt) { BF_HASH_FUNCS = func_cnt; }
-}
+} // namespace de
diff --git a/include/util/types.h b/include/util/types.h
index cf61412..b8a1343 100644
--- a/include/util/types.h
+++ b/include/util/types.h
@@ -1,7 +1,7 @@
/*
* include/util/types.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -17,10 +17,10 @@
*/
#pragma once
+#include <cassert>
#include <cstdint>
#include <cstdlib>
#include <vector>
-#include <cassert>
namespace de {
@@ -30,14 +30,14 @@ typedef uint32_t PageNum;
/*
* Byte offset within a page. Also used for lengths of records, etc.,
* within the codebase. size_t isn't necessary, as the maximum offset
- * is only parm::PAGE_SIZE
+ * is only parm::PAGE_SIZE
*/
typedef uint16_t PageOffset;
/* A unique identifier for a frame within a buffer or cache */
typedef int32_t FrameId;
-/*
+/*
* A unique timestamp for use in MVCC concurrency control. Currently stored in
* record headers, but not used by anything.
*/
@@ -45,7 +45,7 @@ typedef uint32_t Timestamp;
const Timestamp TIMESTAMP_MIN = 0;
const Timestamp TIMESTAMP_MAX = UINT32_MAX;
-/*
+/*
* Invalid values for various IDs. Used throughout the code base to indicate
* uninitialized values and error conditions.
*/
@@ -60,90 +60,85 @@ const FrameId INVALID_FRID = -1;
* as a contiguous index space.
*/
struct ShardID {
- ssize_t level_idx;
- ssize_t shard_idx;
+ ssize_t level_idx;
+ ssize_t shard_idx;
- friend bool operator==(const ShardID &shid1, const ShardID &shid2) {
- return shid1.level_idx == shid2.level_idx && shid1.shard_idx == shid2.shard_idx;
- }
+ friend bool operator==(const ShardID &shid1, const ShardID &shid2) {
+ return shid1.level_idx == shid2.level_idx &&
+ shid1.shard_idx == shid2.shard_idx;
+ }
};
-/* A placeholder for an invalid shard--also used to indicate the mutable buffer */
+/*
+ * A placeholder for an invalid shard--also used to indicate the
+ * mutable buffer
+ */
const ShardID INVALID_SHID = {-1, -1};
typedef ssize_t level_index;
typedef struct ReconstructionTask {
- std::vector<level_index> sources;
- level_index target;
- size_t reccnt;
+ std::vector<level_index> sources;
+ level_index target;
+ size_t reccnt;
- void add_source(level_index source, size_t cnt) {
- sources.push_back(source);
- reccnt += cnt;
- }
+ void add_source(level_index source, size_t cnt) {
+ sources.push_back(source);
+ reccnt += cnt;
+ }
} ReconstructionTask;
class ReconstructionVector {
public:
- ReconstructionVector()
- : total_reccnt(0) {}
+ ReconstructionVector() : total_reccnt(0) {}
- ~ReconstructionVector() = default;
+ ~ReconstructionVector() = default;
- ReconstructionTask operator[](size_t idx) {
- return m_tasks[idx];
- }
+ ReconstructionTask operator[](size_t idx) { return m_tasks[idx]; }
- void add_reconstruction(level_index source, level_index target, size_t reccnt) {
- m_tasks.push_back({{source}, target, reccnt});
- total_reccnt += reccnt;
- }
+ void add_reconstruction(level_index source, level_index target,
+ size_t reccnt) {
+ m_tasks.push_back({{source}, target, reccnt});
+ total_reccnt += reccnt;
+ }
- void add_reconstruction(ReconstructionTask task) {
- m_tasks.push_back(task);
- }
+ void add_reconstruction(ReconstructionTask task) { m_tasks.push_back(task); }
- ReconstructionTask remove_reconstruction(size_t idx) {
- assert(idx < m_tasks.size());
- auto task = m_tasks[idx];
+ ReconstructionTask remove_reconstruction(size_t idx) {
+ assert(idx < m_tasks.size());
+ auto task = m_tasks[idx];
- m_tasks.erase(m_tasks.begin() + idx);
- total_reccnt -= task.reccnt;
+ m_tasks.erase(m_tasks.begin() + idx);
+ total_reccnt -= task.reccnt;
- return task;
- }
+ return task;
+ }
- ReconstructionTask remove_smallest_reconstruction() {
- size_t min_size = m_tasks[0].reccnt;
- size_t idx = 0;
- for (size_t i=1; i<m_tasks.size(); i++) {
- if (m_tasks[i].reccnt < min_size) {
- min_size = m_tasks[i].reccnt;
- idx = i;
- }
- }
-
- auto task = m_tasks[idx];
- m_tasks.erase(m_tasks.begin() + idx);
- total_reccnt -= task.reccnt;
-
- return task;
+ ReconstructionTask remove_smallest_reconstruction() {
+ size_t min_size = m_tasks[0].reccnt;
+ size_t idx = 0;
+ for (size_t i = 1; i < m_tasks.size(); i++) {
+ if (m_tasks[i].reccnt < min_size) {
+ min_size = m_tasks[i].reccnt;
+ idx = i;
+ }
}
- size_t get_total_reccnt() {
- return total_reccnt;
- }
+ auto task = m_tasks[idx];
+ m_tasks.erase(m_tasks.begin() + idx);
+ total_reccnt -= task.reccnt;
- size_t size() {
- return m_tasks.size();
- }
+ return task;
+ }
+
+ size_t get_total_reccnt() { return total_reccnt; }
+ size_t size() { return m_tasks.size(); }
private:
- std::vector<ReconstructionTask> m_tasks;
- size_t total_reccnt;
+ std::vector<ReconstructionTask> m_tasks;
+ size_t total_reccnt;
};
-}
+} // namespace de