From cf5f3bbb0cb58430ed68ad3ebfcefc009e553d71 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 25 Sep 2025 14:42:44 -0400 Subject: Code reformatting --- include/shard/TrieSpline.h | 503 ++++++++++++++++++++++----------------------- 1 file changed, 245 insertions(+), 258 deletions(-) (limited to 'include/shard/TrieSpline.h') diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 1a04afc..2cd514d 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -11,319 +11,306 @@ */ #pragma once - #include #include "framework/ShardRequirements.h" -#include "ts/builder.h" #include "psu-ds/BloomFilter.h" -#include "util/bf_config.h" +#include "ts/builder.h" #include "util/SortedMerge.h" +#include "util/bf_config.h" -using psudb::CACHELINE_SIZE; using psudb::BloomFilter; +using psudb::byte; +using psudb::CACHELINE_SIZE; using psudb::PriorityQueue; using psudb::queue_record; -using psudb::byte; namespace de { -template -class TrieSpline { +template class TrieSpline { public: - typedef R RECORD; + typedef R RECORD; + private: - typedef decltype(R::key) K; - typedef decltype(R::value) V; + typedef decltype(R::key) K; + typedef decltype(R::value) V; public: - TrieSpline(BufferView buffer) - : m_reccnt(0) - , m_tombstone_cnt(0) - , m_alloc_size(0) - , m_max_key(0) - , m_min_key(0) - , m_bf(nullptr) - { - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - buffer.get_record_count() * - sizeof(Wrapped), - (byte**) &m_data); + TrieSpline(BufferView buffer) + : m_reccnt(0), m_tombstone_cnt(0), m_alloc_size(0), m_max_key(0), + m_min_key(0), m_bf(nullptr) { + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped), + (byte **)&m_data); + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped *)psudb::sf_aligned_calloc( + CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped)); + buffer.copy_to_buffer((byte *)temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less>()); + + auto tmp_min_key = temp_buffer[0].rec.key; + auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key; + auto bldr = ts::Builder(tmp_min_key, tmp_max_key, E); + + merge_info info = {0, 0}; + + m_min_key = tmp_min_key; + m_max_key = tmp_max_key; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) && + base->rec == (base + 1)->rec && (base + 1)->is_tombstone() && + base->rec.key != m_max_key && base->rec.key != m_min_key) { + base += 2; + continue; + } else if (base->is_deleted() && base->rec.key != m_max_key && + base->rec.key != m_min_key) { + base += 1; + continue; + } + + base->header &= 3; + bldr.AddKey(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(base->rec); + } + } - /* - * Copy the contents of the buffer view into a temporary buffer, and - * sort them. We still need to iterate over these temporary records to - * apply tombstone/deleted record filtering, as well as any possible - * per-record processing that is required by the shard being built. - */ - auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, - buffer.get_record_count(), - sizeof(Wrapped)); - buffer.copy_to_buffer((byte *) temp_buffer); + base++; + } - auto base = temp_buffer; - auto stop = base + buffer.get_record_count(); - std::sort(base, stop, std::less>()); + free(temp_buffer); - auto tmp_min_key = temp_buffer[0].rec.key; - auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key; - auto bldr = ts::Builder(tmp_min_key, tmp_max_key, E); + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; - merge_info info = {0, 0}; + if (m_reccnt > 50) { + m_ts = bldr.Finalize(); + } + } + + TrieSpline(std::vector const &shards) + : m_reccnt(0), m_tombstone_cnt(0), m_alloc_size(0), m_max_key(0), + m_min_key(0), m_bf(nullptr) { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec(shards, &attemp_reccnt, + &tombstone_count); + + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **)&m_data); + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i = 0; i < cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } - m_min_key = tmp_min_key; - m_max_key = tmp_max_key; + auto tmp_max_key = shards[0]->m_max_key; + auto tmp_min_key = shards[0]->m_min_key; - /* - * Iterate over the temporary buffer to process the records, copying - * them into buffer as needed - */ - while (base < stop) { - if (!base->is_tombstone() && (base + 1 < stop) - && base->rec == (base + 1)->rec && (base + 1)->is_tombstone() - && base->rec.key != m_max_key && base->rec.key != m_min_key) { - base += 2; - continue; - } else if (base->is_deleted() && base->rec.key != m_max_key && base->rec.key != m_min_key) { - base += 1; - continue; - } + for (size_t i = 0; i < shards.size(); i++) { + if (shards[i]->m_max_key > tmp_max_key) { + tmp_max_key = shards[i]->m_max_key; + } - base->header &= 3; - bldr.AddKey(base->rec.key); - m_data[info.record_count++] = *base; + if (shards[i]->m_min_key < tmp_min_key) { + tmp_min_key = shards[i]->m_min_key; + } + } - if (base->is_tombstone()) { - info.tombstone_count++; - if (m_bf){ - m_bf->insert(base->rec); - } + auto bldr = ts::Builder(tmp_min_key, tmp_max_key, E); + + m_max_key = tmp_max_key; + m_min_key = tmp_min_key; + + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = + pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. Unless the tombstone would remove the maximum or + * minimum valued key, which cannot be removed at this point + * without breaking triespline + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone() && + now.data->rec.key != tmp_max_key && + now.data->rec.key != tmp_min_key) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[now.version]; + auto &cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[now.version]; + /* + * skip over records that have been deleted via tagging, + * unless they are the max or min keys, which cannot be + * removed without breaking triespline + */ + if (!cursor.ptr->is_deleted() || cursor.ptr->rec.key == tmp_max_key || + cursor.ptr->rec.key == tmp_min_key) { + bldr.AddKey(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); } - - base++; + } } + pq.pop(); - free(temp_buffer); - - m_reccnt = info.record_count; - m_tombstone_cnt = info.tombstone_count; - - if (m_reccnt > 50) { - m_ts = bldr.Finalize(); - } + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); + } } - TrieSpline(std::vector const &shards) - : m_reccnt(0) - , m_tombstone_cnt(0) - , m_alloc_size(0) - , m_max_key(0) - , m_min_key(0) - , m_bf(nullptr) - { - size_t attemp_reccnt = 0; - size_t tombstone_count = 0; - auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - attemp_reccnt * sizeof(Wrapped), - (byte **) &m_data); - - // FIXME: For smaller cursor arrays, it may be more efficient to skip - // the priority queue and just do a scan. - PriorityQueue> pq(cursors.size()); - for (size_t i=0; im_max_key; - auto tmp_min_key = shards[0]->m_min_key; - - for (size_t i=0; im_max_key > tmp_max_key) { - tmp_max_key = shards[i]->m_max_key; - } - - if (shards[i]->m_min_key < tmp_min_key) { - tmp_min_key = shards[i]->m_min_key; - } - } + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; - auto bldr = ts::Builder(tmp_min_key, tmp_max_key, E); - - m_max_key = tmp_max_key; - m_min_key = tmp_min_key; - - merge_info info = {0, 0}; - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; - /* - * if the current record is not a tombstone, and the next record is - * a tombstone that matches the current one, then the current one - * has been deleted, and both it and its tombstone can be skipped - * over. Unless the tombstone would remove the maximum or - * minimum valued key, which cannot be removed at this point - * without breaking triespline - */ - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone() - && now.data->rec.key != tmp_max_key && now.data->rec.key != tmp_min_key) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - /* - * skip over records that have been deleted via tagging, - * unless they are the max or min keys, which cannot be - * removed without breaking triespline - */ - if (!cursor.ptr->is_deleted() || - cursor.ptr->rec.key == tmp_max_key || cursor.ptr->rec.key == tmp_min_key) { - bldr.AddKey(cursor.ptr->rec.key); - m_data[info.record_count++] = *cursor.ptr; - - /* - * if the record is a tombstone, increment the ts count and - * insert it into the bloom filter if one has been - * provided. - */ - if (cursor.ptr->is_tombstone()) { - info.tombstone_count++; - if (m_bf) { - m_bf->insert(cursor.ptr->rec); - } - } - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); - } - } + if (m_reccnt > 50) { + m_ts = bldr.Finalize(); + } + } - m_reccnt = info.record_count; - m_tombstone_cnt = info.tombstone_count; + ~TrieSpline() { + free(m_data); + delete m_bf; + } - if (m_reccnt > 50) { - m_ts = bldr.Finalize(); - } + Wrapped *point_lookup(const R &rec, bool filter = false) const { + if (filter && m_bf && !m_bf->lookup(rec)) { + return nullptr; } - ~TrieSpline() { - free(m_data); - delete m_bf; + size_t idx = get_lower_bound(rec.key); + if (idx >= m_reccnt) { + return nullptr; } - Wrapped *point_lookup(const R &rec, bool filter=false) const { - if (filter && m_bf && !m_bf->lookup(rec)) { - return nullptr; - } - - size_t idx = get_lower_bound(rec.key); - if (idx >= m_reccnt) { - return nullptr; - } - - while (idx < m_reccnt && m_data[idx].rec < rec) ++idx; - - if (m_data[idx].rec == rec) { - return m_data + idx; - } + while (idx < m_reccnt && m_data[idx].rec < rec) + ++idx; - return nullptr; + if (m_data[idx].rec == rec) { + return m_data + idx; } - Wrapped* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; - } + return nullptr; + } - size_t get_tombstone_count() const { - return m_tombstone_cnt; - } + Wrapped *get_data() const { return m_data; } - const Wrapped* get_record_at(size_t idx) const { - if (idx >= m_reccnt) return nullptr; - return m_data + idx; - } + size_t get_record_count() const { return m_reccnt; } + size_t get_tombstone_count() const { return m_tombstone_cnt; } - size_t get_memory_usage() const { - return m_ts.GetSize(); - } + const Wrapped *get_record_at(size_t idx) const { + if (idx >= m_reccnt) + return nullptr; + return m_data + idx; + } - size_t get_aux_memory_usage() const { - return (m_bf) ? m_bf->memory_usage() : 0; - } + size_t get_memory_usage() const { return m_ts.GetSize(); } - size_t get_lower_bound(const K& key) const { - if (m_reccnt < 50) { - size_t bd = m_reccnt; - for (size_t i=0; i= key) { - bd = i; - break; - } - } + size_t get_aux_memory_usage() const { + return (m_bf) ? m_bf->memory_usage() : 0; + } - return bd; + size_t get_lower_bound(const K &key) const { + if (m_reccnt < 50) { + size_t bd = m_reccnt; + for (size_t i = 0; i < m_reccnt; i++) { + if (m_data[i].rec.key >= key) { + bd = i; + break; } + } - auto bound = m_ts.GetSearchBound(key); - size_t idx = bound.begin; + return bd; + } - if (idx >= m_reccnt) { - return m_reccnt; - } + auto bound = m_ts.GetSearchBound(key); + size_t idx = bound.begin; - // If the region to search is less than some pre-specified - // amount, perform a linear scan to locate the record. - if (bound.end - bound.begin < 256) { - while (idx < bound.end && m_data[idx].rec.key < key) { - idx++; - } - } else { - // Otherwise, perform a binary search - idx = bound.begin; - size_t max = bound.end; - - while (idx < max) { - size_t mid = (idx + max) / 2; - if (key > m_data[mid].rec.key) { - idx = mid + 1; - } else { - max = mid; - } - } - } + if (idx >= m_reccnt) { + return m_reccnt; + } - if (idx == m_reccnt) { - return m_reccnt; + // If the region to search is less than some pre-specified + // amount, perform a linear scan to locate the record. + if (bound.end - bound.begin < 256) { + while (idx < bound.end && m_data[idx].rec.key < key) { + idx++; + } + } else { + // Otherwise, perform a binary search + idx = bound.begin; + size_t max = bound.end; + + while (idx < max) { + size_t mid = (idx + max) / 2; + if (key > m_data[mid].rec.key) { + idx = mid + 1; + } else { + max = mid; } + } + } - if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) { - return idx-1; - } + if (idx == m_reccnt) { + return m_reccnt; + } - return idx; + if (m_data[idx].rec.key > key && idx > 0 && + m_data[idx - 1].rec.key <= key) { + return idx - 1; } -private: + return idx; + } - Wrapped* m_data; - size_t m_reccnt; - size_t m_tombstone_cnt; - size_t m_alloc_size; - K m_max_key; - K m_min_key; - ts::TrieSpline m_ts; - BloomFilter *m_bf; +private: + Wrapped *m_data; + size_t m_reccnt; + size_t m_tombstone_cnt; + size_t m_alloc_size; + K m_max_key; + K m_min_key; + ts::TrieSpline m_ts; + BloomFilter *m_bf; }; -} +} // namespace de -- cgit v1.2.3