From 51a85013236f4b2bd596caf179d90e67c848963c Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 30 Jan 2024 15:31:34 -0500 Subject: TrieSpline + tests --- include/shard/TrieSpline.h | 151 +++++++++++++++++++++++++++------------------ 1 file changed, 90 insertions(+), 61 deletions(-) (limited to 'include/shard/TrieSpline.h') diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 56ec357..8142a67 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -12,10 +12,6 @@ #include -#include -#include -#include -#include #include "framework/ShardRequirements.h" #include "ts/builder.h" @@ -23,62 +19,64 @@ #include "util/Cursor.h" #include "psu-ds/BloomFilter.h" #include "util/bf_config.h" +#include "psu-util/timer.h" using psudb::CACHELINE_SIZE; using psudb::BloomFilter; using psudb::PriorityQueue; using psudb::queue_record; -using psudb::Alias; namespace de { -template +template class TrieSpline { private: typedef decltype(R::key) K; typedef decltype(R::value) V; public: - TrieSpline(MutableBuffer* buffer) - : m_reccnt(0), m_tombstone_cnt(0) { - - m_alloc_size = (buffer->get_record_count() * sizeof(Wrapped)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped)) % CACHELINE_SIZE); - assert(m_alloc_size % CACHELINE_SIZE == 0); - m_data = (Wrapped*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size); - - m_bf = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS); - - size_t offset = 0; - m_reccnt = 0; - auto base = buffer->get_data(); - auto stop = base + buffer->get_record_count(); - + TrieSpline(BufferView buffer) + : m_data(nullptr) + , m_reccnt(0) + , m_tombstone_cnt(0) + , m_alloc_size(0) + , m_max_key(0) + , m_min_key(0) + , m_bf(new BloomFilter(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + { + TIMER_INIT(); + + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + buffer.get_record_count() * + sizeof(Wrapped), + (byte**) &m_data); + + TIMER_START(); + auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped)); + buffer.copy_to_buffer((byte *) temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); std::sort(base, stop, std::less>()); K min_key = base->rec.key; - K max_key = (stop - 1)->rec.key; + K max_key = (stop-1)->rec.key; + TIMER_STOP(); - auto bldr = ts::Builder(min_key, max_key, E); + auto sort_time = TIMER_RESULT(); + TIMER_START(); + auto bldr = ts::Builder(min_key, max_key, E); while (base < stop) { - if (!(base->is_tombstone()) && (base + 1) < stop) { - if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - continue; - } + if (!base->is_tombstone() && (base + 1 < stop) + && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; } else if (base->is_deleted()) { base += 1; continue; } - if (m_reccnt == 0) { - m_max_key = m_min_key = base->rec.key; - } else if (base->rec.key > m_max_key) { - m_max_key = base->rec.key; - } else if (base->rec.key < m_min_key) { - m_min_key = base->rec.key; - } - // FIXME: this shouldn't be necessary, but the tagged record // bypass doesn't seem to be working on this code-path, so this // ensures that tagged records from the buffer are able to be @@ -86,37 +84,67 @@ public: base->header &= 3; m_data[m_reccnt++] = *base; bldr.AddKey(base->rec.key); - if (m_bf && base->is_tombstone()) { - m_tombstone_cnt++; + ++m_tombstone_cnt; m_bf->insert(base->rec); } - + + /* + * determine the "true" min/max keys based on the scan. This is + * to avoid situations where the min/max in the input array + * are deleted and don't survive into the structure itself. + */ + if (m_reccnt == 0) { + m_max_key = m_min_key = base->rec.key; + } else if (base->rec.key > m_max_key) { + m_max_key = base->rec.key; + } else if (base->rec.key < m_min_key) { + m_min_key = base->rec.key; + } + base++; } + TIMER_STOP(); + auto copy_time = TIMER_RESULT(); + + TIMER_START(); if (m_reccnt > 0) { m_ts = bldr.Finalize(); } + TIMER_STOP(); + auto level_time = TIMER_RESULT(); + + free(temp_buffer); } - TrieSpline(TrieSpline** shards, size_t len) - : m_reccnt(0), m_tombstone_cnt(0) { + TrieSpline(std::vector &shards) + : m_data(nullptr) + , m_reccnt(0) + , m_tombstone_cnt(0) + , m_alloc_size(0) + , m_max_key(0) + , m_min_key(0) + , m_bf(nullptr) + { + std::vector>> cursors; - cursors.reserve(len); + cursors.reserve(shards.size()); - PriorityQueue> pq(len); + PriorityQueue> pq(shards.size()); size_t attemp_reccnt = 0; size_t tombstone_count = 0; - // initialize m_max_key and m_min_key using the values from the - // first shard. These will later be updated when building - // the initial priority queue to their true values. + /* + * Initialize m_max_key and m_min_key using the values from the + * first shard. These will later be updated when building + * the initial priority queue to their true values. + */ m_max_key = shards[0]->m_max_key; m_min_key = shards[0]->m_min_key; - for (size_t i = 0; i < len; ++i) { + for (size_t i = 0; i < shards.size(); ++i) { if (shards[i]) { auto base = shards[i]->get_data(); cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); @@ -137,12 +165,11 @@ public: } m_bf = new BloomFilter(BF_FPR, tombstone_count, BF_HASH_FUNCS); - auto bldr = ts::Builder(m_min_key, m_max_key, E); - - m_alloc_size = (attemp_reccnt * sizeof(Wrapped)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped)) % CACHELINE_SIZE); - assert(m_alloc_size % CACHELINE_SIZE == 0); - m_data = (Wrapped*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size); + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + attemp_reccnt * sizeof(Wrapped), + (byte **) &m_data); + auto bldr = ts::Builder(m_min_key, m_max_key, E); while (pq.size()) { auto now = pq.peek(); auto next = pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; @@ -152,33 +179,32 @@ public: pq.pop(); pq.pop(); auto& cursor1 = cursors[now.version]; auto& cursor2 = cursors[next.version]; - if (advance_cursor>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor>(cursor2)) pq.push(cursor2.ptr, next.version); + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto& cursor = cursors[now.version]; if (!cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; bldr.AddKey(cursor.ptr->rec.key); - if (m_bf && cursor.ptr->is_tombstone()) { + if (cursor.ptr->is_tombstone()) { ++m_tombstone_cnt; - if (m_bf) m_bf->insert(cursor.ptr->rec); + m_bf->insert(cursor.ptr->rec); } } pq.pop(); - if (advance_cursor>(cursor)) pq.push(cursor.ptr, now.version); + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); } } if (m_reccnt > 0) { m_ts = bldr.Finalize(); } - } + } ~TrieSpline() { - if (m_data) free(m_data); - if (m_bf) delete m_bf; - + free(m_data); + delete m_bf; } Wrapped *point_lookup(const R &rec, bool filter=false) { @@ -253,14 +279,17 @@ public: max = mid; } } + } + if (idx == m_reccnt) { + return m_reccnt; } if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) { return idx-1; } - return (m_data[idx].rec.key <= key) ? idx : m_reccnt; + return idx; } private: -- cgit v1.2.3