From 49bceabf90d114b89638659141d54083c1b7f395 Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Wed, 21 Feb 2024 17:03:08 -0500 Subject: VPTree: precalculate distances to make construction more efficient --- include/shard/VPTree.h | 49 ++++++++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 19 deletions(-) (limited to 'include/shard') diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h index b342fe6..62857ce 100644 --- a/include/shard/VPTree.h +++ b/include/shard/VPTree.h @@ -58,7 +58,7 @@ public: sizeof(Wrapped), (byte**) &m_data); - m_ptrs = new Wrapped*[buffer.get_record_count()]; + m_ptrs = new vp_ptr[buffer.get_record_count()]; size_t offset = 0; m_reccnt = 0; @@ -76,7 +76,7 @@ public: rec->header &= 3; m_data[m_reccnt] = *rec; - m_ptrs[m_reccnt] = &m_data[m_reccnt]; + m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; m_reccnt++; } @@ -97,7 +97,7 @@ public: m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **) &m_data); - m_ptrs = new Wrapped*[attemp_reccnt]; + m_ptrs = new vp_ptr[attemp_reccnt]; // FIXME: will eventually need to figure out tombstones // this one will likely require the multi-pass @@ -110,7 +110,7 @@ public: } m_data[m_reccnt] = *shards[i]->get_record_at(j); - m_ptrs[m_reccnt] = &m_data[m_reccnt]; + m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; m_reccnt++; } } @@ -139,8 +139,8 @@ public: } else { vpnode *node = m_root; - while (!node->leaf && m_ptrs[node->start]->rec != rec) { - if (rec.calc_distance((m_ptrs[node->start]->rec)) >= node->radius) { + while (!node->leaf && m_ptrs[node->start].ptr->rec != rec) { + if (rec.calc_distance((m_ptrs[node->start].ptr->rec)) >= node->radius) { node = node->outside; } else { node = node->inside; @@ -148,8 +148,8 @@ public: } for (size_t i=node->start; i<=node->stop; i++) { - if (m_ptrs[i]->rec == rec) { - return m_ptrs[i]; + if (m_ptrs[i].ptr->rec == rec) { + return m_ptrs[i].ptr; } } @@ -191,8 +191,12 @@ public: } private: + struct vp_ptr { + Wrapped *ptr; + double dist; + }; Wrapped* m_data; - Wrapped** m_ptrs; + vp_ptr* m_ptrs; std::unordered_map> m_lookup_map; size_t m_reccnt; size_t m_tombstone_cnt; @@ -260,6 +264,11 @@ private: auto i = start + gsl_rng_uniform_int(rng, stop - start + 1); swap(start, i); + /* for efficiency, we'll pre-calculate the distances between each point and the root */ + for (size_t i=start+1; i<=stop; i++) { + m_ptrs[i].dist = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[i].ptr->rec); + } + /* * partition elements based on their distance from the start, * with those elements with distance falling below the median @@ -267,14 +276,15 @@ private: * the median in the right. This is easily done using QuickSelect. */ auto mid = (start + 1 + stop) / 2; - quickselect(start + 1, stop, mid, m_ptrs[start], rng); + quickselect(start + 1, stop, mid, m_ptrs[start].ptr, rng); /* Create a new node based on this partitioning */ vpnode *node = new vpnode(); node->start = start; /* store the radius of the circle used for partitioning the node. */ - node->radius = m_ptrs[start]->rec.calc_distance(m_ptrs[mid]->rec); + node->radius = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[mid].ptr->rec); + m_ptrs[start].dist = node->radius; /* recursively construct the left and right subtrees */ node->inside = build_subtree(start + 1, mid-1, rng); @@ -285,8 +295,6 @@ private: return node; } - // TODO: The quickselect code can probably be generalized and moved out - // to psudb-common instead. void quickselect(size_t start, size_t stop, size_t k, Wrapped *p, gsl_rng *rng) { if (start == stop) return; @@ -303,13 +311,16 @@ private: // to psudb-common instead. size_t partition(size_t start, size_t stop, Wrapped *p, gsl_rng *rng) { auto pivot = start + gsl_rng_uniform_int(rng, stop - start); - double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec); + //double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec); swap(pivot, stop); size_t j = start; for (size_t i=start; irec.calc_distance(m_ptrs[i]->rec) < pivot_dist) { + if (m_ptrs[i].dist < m_ptrs[stop].dist) { + //assert(distances[i - start] == p->rec.calc_distance(m_ptrs[i]->rec)); + //if (distances[i - start] < distances[stop - start]) { + //if (p->rec .calc_distance(m_ptrs[i]->rec) < pivot_dist) { swap(j, i); j++; } @@ -332,13 +343,13 @@ private: if (node->leaf) { for (size_t i=node->start; i<=node->stop; i++) { - double d = point.calc_distance(m_ptrs[i]->rec); + double d = point.calc_distance(m_ptrs[i].ptr->rec); if (d < *farthest) { if (pq.size() == k) { pq.pop(); } - pq.push(m_ptrs[i]); + pq.push(m_ptrs[i].ptr); if (pq.size() == k) { *farthest = point.calc_distance(pq.peek().data->rec); } @@ -348,14 +359,14 @@ private: return; } - double d = point.calc_distance(m_ptrs[node->start]->rec); + double d = point.calc_distance(m_ptrs[node->start].ptr->rec); if (d < *farthest) { if (pq.size() == k) { auto t = pq.peek().data->rec; pq.pop(); } - pq.push(m_ptrs[node->start]); + pq.push(m_ptrs[node->start].ptr); if (pq.size() == k) { *farthest = point.calc_distance(pq.peek().data->rec); } -- cgit v1.2.3 From 405bf4a20b4a22a6bb4b60b730b6a7e901fdccf6 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 19 Mar 2024 11:10:02 -0400 Subject: FST Shard w/ tests Needs some debugging--some methods currently fail within the library itself. The build system doesn't currently build the FST library. To compile, you'll first need to manually build it, and then place the libFST.so file in your LIBRARY_PATH and LD_LIBRARY_PATH. --- include/shard/FSTrie.h | 266 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 266 insertions(+) create mode 100644 include/shard/FSTrie.h (limited to 'include/shard') diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h new file mode 100644 index 0000000..11a232d --- /dev/null +++ b/include/shard/FSTrie.h @@ -0,0 +1,266 @@ +/* + * include/shard/FSTrie.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * + * Distributed under the Modified BSD License. + * + * A shard shim around the FSTrie learned index. + * + * TODO: The code in this file is very poorly commented. + */ +#pragma once + + +#include + +#include "framework/ShardRequirements.h" +#include "FST.hpp" +#include "psu-ds/BloomFilter.h" +#include "util/bf_config.h" +#include "util/SortedMerge.h" + +using psudb::CACHELINE_SIZE; +using psudb::BloomFilter; +using psudb::PriorityQueue; +using psudb::queue_record; +using psudb::byte; + +namespace de { + +template +class FSTrie { +private: + typedef decltype(R::key) K; + typedef decltype(R::value) V; + + static_assert(std::is_same_v || std::is_same_v, + "FST requires either string or uint64_t keys"); + +public: + FSTrie(BufferView buffer) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + buffer.get_record_count() * + sizeof(Wrapped), + (byte**) &m_data); + size_t cnt = 0; + std::vector keys; + keys.reserve(buffer.get_record_count()); + + std::vector values; + values.reserve(buffer.get_record_count()); + + size_t longest_key = 0; + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + buffer.get_record_count(), + sizeof(Wrapped)); + buffer.copy_to_buffer((byte *) temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less>()); + + for (size_t i=0; i) { + if (m_data[cnt].rec.key.size() > longest_key) { + longest_key = m_data[cnt].rec.key.size(); + } + } + + cnt++; + } + + m_reccnt = cnt; + m_fst = FST(); + if constexpr (std::is_same_v) { + m_fst.load(keys, values, longest_key); + } else { + m_fst.load(keys, values); + } + + free(temp_buffer); + } + + FSTrie(std::vector &shards) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); + + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + attemp_reccnt * sizeof(Wrapped), + (byte **) &m_data); + + std::vector keys; + keys.reserve(attemp_reccnt); + + std::vector values; + values.reserve(attemp_reccnt); + + size_t longest_key = 0; + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + m_data[m_reccnt] = *cursor.ptr; + keys.push_back(m_data[m_reccnt].rec.key); + values.push_back(m_data[m_reccnt].rec.value); + + if constexpr (std::is_same_v) { + if (m_data[m_reccnt].rec.key.size() > longest_key) { + longest_key = m_data[m_reccnt].rec.key.size(); + } + } + + m_reccnt++; + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + if (m_reccnt > 0) { + m_fst = FST(); + if constexpr (std::is_same_v) { + m_fst.load(keys, values, longest_key); + } else { + m_fst.load(keys, values); + } + } + } + + ~FSTrie() { + free(m_data); + } + + Wrapped *point_lookup(const R &rec, bool filter=false) { + size_t idx; + bool res; + if constexpr (std::is_same_v) { + res = m_fst.lookup(rec.key.c_str(), rec.key.size(), idx); + } else { + res = m_fst.lookup(rec.key, idx); + } + + if (res) { + return m_data + idx; + } + + return nullptr; + } + + Wrapped* get_data() const { + return m_data; + } + + size_t get_record_count() const { + return m_reccnt; + } + + size_t get_tombstone_count() const { + return 0; + } + + const Wrapped* get_record_at(size_t idx) const { + if (idx >= m_reccnt) return nullptr; + return m_data + idx; + } + + + size_t get_memory_usage() { + return m_fst.mem() + m_alloc_size; + } + + size_t get_aux_memory_usage() { + return 0; + } + + size_t get_lower_bound(const K& key) { + auto itr = FSTIter(); + + const K temp_key = key; + + bool res; + if constexpr (std::is_same_v) { + res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr); + } else { + res = m_fst.lowerBound(temp_key, itr); + } + + return itr.value(); + } + + size_t get_upper_bound(const K& key) { + auto itr = FSTIter(); + + const K temp_key = key; + + bool res; + if constexpr (std::is_same_v) { + res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr); + } else { + res = m_fst.lowerBound(temp_key, itr); + } + + size_t idx = itr.value(); + while (idx < m_reccnt && m_data[idx].rec.key <= key) { + idx++; + } + + return idx; + } + +private: + + Wrapped* m_data; + size_t m_reccnt; + size_t m_alloc_size; + FST m_fst; +}; +} -- cgit v1.2.3 From 9fe190f5d500e22b0894095e7c917f9c652e0a64 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 20 Mar 2024 17:30:14 -0400 Subject: Updates/progress towards succinct trie support --- include/shard/FSTrie.h | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) (limited to 'include/shard') diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index 11a232d..62aa0b7 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -43,10 +43,9 @@ public: , m_reccnt(0) , m_alloc_size(0) { - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - buffer.get_record_count() * - sizeof(Wrapped), - (byte**) &m_data); + m_data = new Wrapped[buffer.get_record_count()](); + m_alloc_size = sizeof(Wrapped) * buffer.get_record_count(); + size_t cnt = 0; std::vector keys; keys.reserve(buffer.get_record_count()); @@ -62,10 +61,15 @@ public: * apply tombstone/deleted record filtering, as well as any possible * per-record processing that is required by the shard being built. */ + /* auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped)); - buffer.copy_to_buffer((byte *) temp_buffer); + */ + auto temp_buffer = new Wrapped[buffer.get_record_count()](); + for (size_t i=0; i) { @@ -88,6 +94,10 @@ public: cnt++; } + for (size_t i=0; i) { @@ -96,7 +106,7 @@ public: m_fst.load(keys, values); } - free(temp_buffer); + delete[] temp_buffer; } FSTrie(std::vector &shards) @@ -108,9 +118,8 @@ public: size_t tombstone_count = 0; auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - attemp_reccnt * sizeof(Wrapped), - (byte **) &m_data); + m_data = new Wrapped[attemp_reccnt](); + m_alloc_size = attemp_reccnt * sizeof(Wrapped); std::vector keys; keys.reserve(attemp_reccnt); @@ -165,6 +174,10 @@ public: } } + for (size_t i=0; i 0) { m_fst = FST(); if constexpr (std::is_same_v) { @@ -176,18 +189,22 @@ public: } ~FSTrie() { - free(m_data); + delete[] m_data; } Wrapped *point_lookup(const R &rec, bool filter=false) { size_t idx; bool res; if constexpr (std::is_same_v) { - res = m_fst.lookup(rec.key.c_str(), rec.key.size(), idx); + res = m_fst.lookup((uint8_t*)rec.key.c_str(), rec.key.size(), idx); } else { res = m_fst.lookup(rec.key, idx); } + if (res && m_data[idx].rec.key != rec.key) { + fprintf(stderr, "ERROR!\n"); + } + if (res) { return m_data + idx; } -- cgit v1.2.3 From 147f0df58e1ff4973bffb7e4628e6b2fdc20eb57 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 22 Mar 2024 14:04:40 -0400 Subject: FSTrie testing and debugging --- include/shard/FSTrie.h | 121 ++++++++----------------------------------------- 1 file changed, 20 insertions(+), 101 deletions(-) (limited to 'include/shard') diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index 62aa0b7..aa3c9f4 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -1,13 +1,11 @@ /* * include/shard/FSTrie.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * * A shard shim around the FSTrie learned index. - * - * TODO: The code in this file is very poorly commented. */ #pragma once @@ -15,9 +13,7 @@ #include #include "framework/ShardRequirements.h" -#include "FST.hpp" -#include "psu-ds/BloomFilter.h" -#include "util/bf_config.h" +#include "fst.hpp" #include "util/SortedMerge.h" using psudb::CACHELINE_SIZE; @@ -28,14 +24,13 @@ using psudb::byte; namespace de { -template +template class FSTrie { private: + typedef decltype(R::key) K; typedef decltype(R::value) V; - - static_assert(std::is_same_v || std::is_same_v, - "FST requires either string or uint64_t keys"); + static_assert(std::is_same_v, "FST requires std::string keys."); public: FSTrie(BufferView buffer) @@ -50,22 +45,12 @@ public: std::vector keys; keys.reserve(buffer.get_record_count()); - std::vector values; - values.reserve(buffer.get_record_count()); - - size_t longest_key = 0; - /* * Copy the contents of the buffer view into a temporary buffer, and * sort them. We still need to iterate over these temporary records to * apply tombstone/deleted record filtering, as well as any possible * per-record processing that is required by the shard being built. */ - /* - auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, - buffer.get_record_count(), - sizeof(Wrapped)); - */ auto temp_buffer = new Wrapped[buffer.get_record_count()](); for (size_t i=0; i>()); for (size_t i=0; i) { - if (m_data[cnt].rec.key.size() > longest_key) { - longest_key = m_data[cnt].rec.key.size(); - } - } - cnt++; } @@ -99,11 +77,8 @@ public: } m_reccnt = cnt; - m_fst = FST(); - if constexpr (std::is_same_v) { - m_fst.load(keys, values, longest_key); - } else { - m_fst.load(keys, values); + if (m_reccnt > 0) { + m_fst = new fst::Trie(keys); } delete[] temp_buffer; @@ -124,10 +99,6 @@ public: std::vector keys; keys.reserve(attemp_reccnt); - std::vector values; - values.reserve(attemp_reccnt); - - size_t longest_key = 0; // FIXME: For smaller cursor arrays, it may be more efficient to skip // the priority queue and just do a scan. PriorityQueue> pq(cursors.size()); @@ -155,16 +126,9 @@ public: } else { auto& cursor = cursors[now.version]; /* skip over records that have been deleted via tagging */ - if (!cursor.ptr->is_deleted()) { + if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { m_data[m_reccnt] = *cursor.ptr; keys.push_back(m_data[m_reccnt].rec.key); - values.push_back(m_data[m_reccnt].rec.value); - - if constexpr (std::is_same_v) { - if (m_data[m_reccnt].rec.key.size() > longest_key) { - longest_key = m_data[m_reccnt].rec.key.size(); - } - } m_reccnt++; } @@ -179,37 +143,24 @@ public: } if (m_reccnt > 0) { - m_fst = FST(); - if constexpr (std::is_same_v) { - m_fst.load(keys, values, longest_key); - } else { - m_fst.load(keys, values); - } + m_fst = new fst::Trie(keys); } } ~FSTrie() { delete[] m_data; + delete m_fst; } Wrapped *point_lookup(const R &rec, bool filter=false) { - size_t idx; - bool res; - if constexpr (std::is_same_v) { - res = m_fst.lookup((uint8_t*)rec.key.c_str(), rec.key.size(), idx); - } else { - res = m_fst.lookup(rec.key, idx); - } - if (res && m_data[idx].rec.key != rec.key) { - fprintf(stderr, "ERROR!\n"); - } + auto idx = m_fst->exactSearch(rec.key); - if (res) { - return m_data + idx; + if (idx == fst::kNotFound) { + return nullptr; } - return nullptr; + return m_data + idx; } Wrapped* get_data() const { @@ -231,53 +182,21 @@ public: size_t get_memory_usage() { - return m_fst.mem() + m_alloc_size; + return m_fst->getMemoryUsage() + m_alloc_size; } size_t get_aux_memory_usage() { return 0; } - size_t get_lower_bound(const K& key) { - auto itr = FSTIter(); - - const K temp_key = key; - - bool res; - if constexpr (std::is_same_v) { - res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr); - } else { - res = m_fst.lowerBound(temp_key, itr); - } - - return itr.value(); - } - - size_t get_upper_bound(const K& key) { - auto itr = FSTIter(); - - const K temp_key = key; - - bool res; - if constexpr (std::is_same_v) { - res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr); - } else { - res = m_fst.lowerBound(temp_key, itr); - } - - size_t idx = itr.value(); - while (idx < m_reccnt && m_data[idx].rec.key <= key) { - idx++; - } - - return idx; - } + size_t get_lower_bound(R &rec) {return 0;} + size_t get_upper_bound(R &rec) {return 0;} private: Wrapped* m_data; size_t m_reccnt; size_t m_alloc_size; - FST m_fst; + fst::Trie *m_fst; }; } -- cgit v1.2.3 From d0aebc685b245e51bf47cff8e28f811e43073d5e Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 22 Mar 2024 14:15:00 -0400 Subject: PGM.h: fixed an out of bounds array access on point lookup misses. --- include/shard/PGM.h | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) (limited to 'include/shard') diff --git a/include/shard/PGM.h b/include/shard/PGM.h index e2752ef..ff9ce2d 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -147,14 +147,16 @@ public: return m_reccnt; } - // If the region to search is less than some pre-specified - // amount, perform a linear scan to locate the record. + /* + * If the region to search is less than some pre-specified + * amount, perform a linear scan to locate the record. + */ if (bound.hi - bound.lo < 256) { while (idx < bound.hi && m_data[idx].rec.key < key) { idx++; } } else { - // Otherwise, perform a binary search + /* Otherwise, perform a binary search */ idx = bound.lo; size_t max = bound.hi; @@ -169,10 +171,26 @@ public: } + /* + * the upper bound returned by PGM is one passed the end of the + * array. If we are at that point, we should just return "not found" + */ + if (idx == m_reccnt) { + return idx; + } + + /* + * We may have walked one passed the actual lower bound, so check + * the index before the current one to see if it is the actual bound + */ if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) { return idx-1; } + /* + * Otherwise, check idx. If it is a valid bound, then return it, + * otherwise return "not found". + */ return (m_data[idx].rec.key >= key) ? idx : m_reccnt; } -- cgit v1.2.3 From fb4312a883dd0e382ecbcfe1119479e6f44d32a6 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 22 Mar 2024 15:35:14 -0400 Subject: PointLookup: added a point lookup query for unique indexes, and some tests --- include/shard/FSTrie.h | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'include/shard') diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index aa3c9f4..50bf982 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -160,6 +160,12 @@ public: return nullptr; } + // FIXME: for convenience, I'm treating this Trie as a unique index + // for now, so no need to scan forward and/or check values. This + // also makes the point lookup query class a lot easier to make. + // Ultimately, though, we can support non-unique indexes with some + // extra work. + return m_data + idx; } -- cgit v1.2.3 From 7e7fd9f7339eee2f1ae974c662a447532dfb1b1a Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Tue, 26 Mar 2024 16:35:12 -0400 Subject: Updated FSTrie benchmark and some minor fixes --- include/shard/FSTrie.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include/shard') diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index 50bf982..95f396f 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -61,7 +61,7 @@ public: std::sort(base, stop, std::less>()); for (size_t i=0; i Date: Thu, 11 Apr 2024 12:23:21 -0400 Subject: stuff --- include/shard/ISAMTree.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include/shard') diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index 3763271..af62c92 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -70,7 +70,7 @@ public: * without this, gcc seems to hoist the building of the array * _above_ its allocation under -O3, resulting in memfaults. */ - asm volatile ("" ::: "memory"); + //asm volatile ("" ::: "memory"); auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); m_reccnt = res.record_count; -- cgit v1.2.3 From b25beb13773072c3b143842b45a7c32a1108f347 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 15 Apr 2024 14:00:27 -0400 Subject: Updated FSTrie to use const char * instead of std::string Note: this requires the caller to manage the memory of the strings --- include/shard/FSTrie.h | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) (limited to 'include/shard') diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index 95f396f..be678ff 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -30,7 +30,7 @@ private: typedef decltype(R::key) K; typedef decltype(R::value) V; - static_assert(std::is_same_v, "FST requires std::string keys."); + static_assert(std::is_same_v, "FST requires const char* keys."); public: FSTrie(BufferView buffer) @@ -42,7 +42,7 @@ public: m_alloc_size = sizeof(Wrapped) * buffer.get_record_count(); size_t cnt = 0; - std::vector keys; + std::vector keys; keys.reserve(buffer.get_record_count()); /* @@ -68,14 +68,10 @@ public: m_data[cnt] = temp_buffer[i]; m_data[cnt].clear_timestamp(); - keys.push_back(m_data[cnt].rec.key); + keys.push_back(std::string(m_data[cnt].rec.key)); cnt++; } - for (size_t i=0; i 0) { m_fst = new fst::Trie(keys); @@ -96,7 +92,7 @@ public: m_data = new Wrapped[attemp_reccnt](); m_alloc_size = attemp_reccnt * sizeof(Wrapped); - std::vector keys; + std::vector keys; keys.reserve(attemp_reccnt); // FIXME: For smaller cursor arrays, it may be more efficient to skip @@ -128,7 +124,7 @@ public: /* skip over records that have been deleted via tagging */ if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { m_data[m_reccnt] = *cursor.ptr; - keys.push_back(m_data[m_reccnt].rec.key); + keys.push_back(std::string(m_data[m_reccnt].rec.key)); m_reccnt++; } @@ -138,10 +134,6 @@ public: } } - for (size_t i=0; i 0) { m_fst = new fst::Trie(keys); } -- cgit v1.2.3 From 7c2f43ff039795576bc0014c367b893fbbaceca4 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 19 Apr 2024 14:39:33 -0400 Subject: Benchmark updates --- include/shard/Alias.h | 2 +- include/shard/AugBTree.h | 2 +- include/shard/FSTrie.h | 8 ++++---- include/shard/ISAMTree.h | 14 +++----------- include/shard/PGM.h | 6 +++--- include/shard/TrieSpline.h | 10 ++++------ include/shard/VPTree.h | 2 +- 7 files changed, 17 insertions(+), 27 deletions(-) (limited to 'include/shard') diff --git a/include/shard/Alias.h b/include/shard/Alias.h index 9275952..72147d7 100644 --- a/include/shard/Alias.h +++ b/include/shard/Alias.h @@ -148,7 +148,7 @@ public: size_t get_memory_usage() { - return m_alloc_size; + return 0; } size_t get_aux_memory_usage() { diff --git a/include/shard/AugBTree.h b/include/shard/AugBTree.h index 54931bd..c60cbcd 100644 --- a/include/shard/AugBTree.h +++ b/include/shard/AugBTree.h @@ -148,7 +148,7 @@ public: } size_t get_memory_usage() { - return m_alloc_size + m_node_cnt * sizeof(AugBTreeNode>); + return m_node_cnt * sizeof(AugBTreeNode>); } size_t get_aux_memory_usage() { diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index be678ff..3783b38 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -74,7 +74,7 @@ public: m_reccnt = cnt; if (m_reccnt > 0) { - m_fst = new fst::Trie(keys); + m_fst = new fst::Trie(keys, true, 1); } delete[] temp_buffer; @@ -135,7 +135,7 @@ public: } if (m_reccnt > 0) { - m_fst = new fst::Trie(keys); + m_fst = new fst::Trie(keys, true, 1); } } @@ -180,11 +180,11 @@ public: size_t get_memory_usage() { - return m_fst->getMemoryUsage() + m_alloc_size; + return m_fst->getMemoryUsage(); } size_t get_aux_memory_usage() { - return 0; + return m_alloc_size; } size_t get_lower_bound(R &rec) {return 0;} diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index af62c92..1cca506 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -51,7 +51,7 @@ constexpr static size_t LEAF_FANOUT = NODE_SZ / sizeof(R); public: ISAMTree(BufferView buffer) - : m_bf(new BloomFilter(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + : m_bf(nullptr) , m_isam_nodes(nullptr) , m_root(nullptr) , m_reccnt(0) @@ -59,19 +59,12 @@ public: , m_internal_node_cnt(0) , m_deleted_cnt(0) , m_alloc_size(0) - , m_data(nullptr) { m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped), (byte**) &m_data); - /* - * without this, gcc seems to hoist the building of the array - * _above_ its allocation under -O3, resulting in memfaults. - */ - //asm volatile ("" ::: "memory"); - auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); m_reccnt = res.record_count; m_tombstone_cnt = res.tombstone_count; @@ -90,13 +83,12 @@ public: , m_internal_node_cnt(0) , m_deleted_cnt(0) , m_alloc_size(0) - , m_data(nullptr) { size_t attemp_reccnt = 0; size_t tombstone_count = 0; auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - m_bf = new BloomFilter(BF_FPR, tombstone_count, BF_HASH_FUNCS); + m_bf = nullptr; m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **) &m_data); @@ -149,7 +141,7 @@ public: size_t get_memory_usage() { - return m_alloc_size + m_internal_node_cnt * NODE_SZ; + return m_internal_node_cnt * NODE_SZ; } size_t get_aux_memory_usage() { diff --git a/include/shard/PGM.h b/include/shard/PGM.h index ff9ce2d..a3f9749 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -39,8 +39,7 @@ private: public: PGM(BufferView buffer) - : m_data(nullptr) - , m_bf(new BloomFilter(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + : m_bf(nullptr) , m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) { @@ -49,6 +48,7 @@ public: buffer.get_record_count() * sizeof(Wrapped), (byte**) &m_data); + auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); m_reccnt = res.record_count; m_tombstone_cnt = res.tombstone_count; @@ -132,7 +132,7 @@ public: size_t get_memory_usage() { - return m_pgm.size_in_bytes() + m_alloc_size; + return m_pgm.size_in_bytes(); } size_t get_aux_memory_usage() { diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 2a432e8..023390e 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -36,13 +36,12 @@ private: public: TrieSpline(BufferView buffer) - : m_data(nullptr) - , m_reccnt(0) + : m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) , m_max_key(0) , m_min_key(0) - , m_bf(new BloomFilter(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + , m_bf(nullptr) { m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * @@ -79,7 +78,6 @@ public: size_t tombstone_count = 0; auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - m_bf = new BloomFilter(BF_FPR, tombstone_count, BF_HASH_FUNCS); m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **) &m_data); @@ -107,7 +105,7 @@ public: } Wrapped *point_lookup(const R &rec, bool filter=false) { - if (filter && !m_bf->lookup(rec)) { + if (filter && m_bf && !m_bf->lookup(rec)) { return nullptr; } @@ -144,7 +142,7 @@ public: size_t get_memory_usage() { - return m_ts.GetSize() + m_alloc_size; + return m_ts.GetSize(); } size_t get_aux_memory_usage() { diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h index 62857ce..d5a2393 100644 --- a/include/shard/VPTree.h +++ b/include/shard/VPTree.h @@ -175,7 +175,7 @@ public: } size_t get_memory_usage() { - return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*) + m_alloc_size; + return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*); } size_t get_aux_memory_usage() { -- cgit v1.2.3 From 34fd8ad935e6359d20a5d6c949e67495d0842f8f Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 19 Apr 2024 14:40:19 -0400 Subject: More trie baseline tests --- include/shard/LoudsPatricia.h | 199 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 199 insertions(+) create mode 100644 include/shard/LoudsPatricia.h (limited to 'include/shard') diff --git a/include/shard/LoudsPatricia.h b/include/shard/LoudsPatricia.h new file mode 100644 index 0000000..3452839 --- /dev/null +++ b/include/shard/LoudsPatricia.h @@ -0,0 +1,199 @@ +/* + * include/shard/LoudsPatricia.h + * + * Copyright (C) 2024 Douglas B. Rumbaugh + * + * Distributed under the Modified BSD License. + * + * A shard shim around the LoudsPatricia learned index. + */ +#pragma once + + +#include + +#include "framework/ShardRequirements.h" +#include "louds-patricia.hpp" +#include "util/SortedMerge.h" + +using psudb::CACHELINE_SIZE; +using psudb::BloomFilter; +using psudb::PriorityQueue; +using psudb::queue_record; +using psudb::byte; + +namespace de { + +template +class LoudsPatricia { +private: + + typedef decltype(R::key) K; + typedef decltype(R::value) V; + static_assert(std::is_same_v, "FST requires const char* keys."); + +public: + LoudsPatricia(BufferView buffer) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + m_data = new Wrapped[buffer.get_record_count()](); + m_alloc_size = sizeof(Wrapped) * buffer.get_record_count(); + + m_louds = new louds::Patricia(); + + size_t cnt = 0; + std::vector keys; + keys.reserve(buffer.get_record_count()); + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = new Wrapped[buffer.get_record_count()](); + for (size_t i=0; i>()); + + for (size_t i=0; iadd(std::string(m_data[cnt].rec.key)); + cnt++; + } + + m_reccnt = cnt; + if (m_reccnt > 0) { + m_louds->build(); + } + + delete[] temp_buffer; + } + + LoudsPatricia(std::vector &shards) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); + + m_data = new Wrapped[attemp_reccnt](); + m_alloc_size = attemp_reccnt * sizeof(Wrapped); + + m_louds = new louds::Patricia(); + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { + m_data[m_reccnt] = *cursor.ptr; + m_louds->add(std::string(m_data[m_reccnt].rec.key)); + m_reccnt++; + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + if (m_reccnt > 0) { + m_louds->build(); + } + } + + ~LoudsPatricia() { + delete[] m_data; + delete m_louds; + } + + Wrapped *point_lookup(const R &rec, bool filter=false) { + + auto idx = m_louds->lookup(std::string(rec.key)); + + if (idx == -1) { + return nullptr; + } + + // FIXME: for convenience, I'm treating this Trie as a unique index + // for now, so no need to scan forward and/or check values. This + // also makes the point lookup query class a lot easier to make. + // Ultimately, though, we can support non-unique indexes with some + // extra work. + return m_data + idx; + } + + Wrapped* get_data() const { + return m_data; + } + + size_t get_record_count() const { + return m_reccnt; + } + + size_t get_tombstone_count() const { + return 0; + } + + const Wrapped* get_record_at(size_t idx) const { + if (idx >= m_reccnt) return nullptr; + return m_data + idx; + } + + + size_t get_memory_usage() { + return m_louds->size(); + } + + size_t get_aux_memory_usage() { + return m_alloc_size; + } + + size_t get_lower_bound(R &rec) {return 0;} + size_t get_upper_bound(R &rec) {return 0;} + +private: + + Wrapped* m_data; + size_t m_reccnt; + size_t m_alloc_size; + louds::Patricia *m_louds; +}; +} -- cgit v1.2.3 From b8168b37a6dd91295903f52ee878a57f149b261d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 19 Apr 2024 15:02:44 -0400 Subject: PGM Shard: Fully disabled bloom filter --- include/shard/PGM.h | 1 - 1 file changed, 1 deletion(-) (limited to 'include/shard') diff --git a/include/shard/PGM.h b/include/shard/PGM.h index a3f9749..691385e 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -74,7 +74,6 @@ public: size_t tombstone_count = 0; auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - m_bf = new BloomFilter(BF_FPR, tombstone_count, BF_HASH_FUNCS); m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **) &m_data); -- cgit v1.2.3 From 735d397513bc0160ba9ecb17c32c4441ed125f52 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 22 Apr 2024 10:24:41 -0400 Subject: TS+PGM: Inlined manually the sorted array merge for performance reasons --- include/shard/PGM.h | 119 +++++++++++++++++++++++++++++---- include/shard/TrieSpline.h | 163 +++++++++++++++++++++++++++++++++++++++------ 2 files changed, 248 insertions(+), 34 deletions(-) (limited to 'include/shard') diff --git a/include/shard/PGM.h b/include/shard/PGM.h index 691385e..509796b 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -49,16 +49,62 @@ public: sizeof(Wrapped), (byte**) &m_data); - auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + std::vector keys; + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + buffer.get_record_count(), + sizeof(Wrapped)); + buffer.copy_to_buffer((byte *) temp_buffer); - if (m_reccnt > 0) { - std::vector keys; - for (size_t i=0; i>()); + + merge_info info = {0, 0}; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) + && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; } + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + keys.emplace_back(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf){ + m_bf->insert(base->rec); + } + } + + base++; + } + + free(temp_buffer); + + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_pgm = pgm::PGMIndex(keys); } } @@ -77,17 +123,62 @@ public: m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **) &m_data); + std::vector keys; - auto res = sorted_array_merge(cursors, m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i=0; i 0) { - std::vector keys; - for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + keys.emplace_back(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); + } + } + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); } + } + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_pgm = pgm::PGMIndex(keys); } } diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 023390e..3ae72f8 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -48,26 +48,82 @@ public: sizeof(Wrapped), (byte**) &m_data); - auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + buffer.get_record_count(), + sizeof(Wrapped)); + buffer.copy_to_buffer((byte *) temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less>()); + + auto tmp_min_key = temp_buffer[0].rec.key; + auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key; + auto bldr = ts::Builder(tmp_min_key, tmp_max_key, E); + + merge_info info = {0, 0}; + + m_min_key = tmp_max_key; + m_max_key = tmp_min_key; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) + && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; + } - if (m_reccnt > 0) { - m_min_key = m_data[0].rec.key; - m_max_key = m_data[m_reccnt-1].rec.key; + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + bldr.AddKey(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf){ + m_bf->insert(base->rec); + } + } + + if (base->rec.key < m_min_key) { + m_min_key = base->rec.key; + } - auto bldr = ts::Builder(m_min_key, m_max_key, E); - for (size_t i=0; irec.key > m_max_key) { + m_max_key = base->rec.key; } + base++; + } + + free(temp_buffer); + + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_ts = bldr.Finalize(); } } TrieSpline(std::vector &shards) - : m_data(nullptr) - , m_reccnt(0) + : m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) , m_max_key(0) @@ -82,19 +138,86 @@ public: attemp_reccnt * sizeof(Wrapped), (byte **) &m_data); - auto res = sorted_array_merge(cursors, m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i=0; i 0) { - m_min_key = m_data[0].rec.key; - m_max_key = m_data[m_reccnt-1].rec.key; + auto tmp_max_key = shards[0]->m_max_key; + auto tmp_min_key = shards[0]->m_min_key; + + for (size_t i=0; im_max_key > tmp_max_key) { + tmp_max_key = shards[i]->m_max_key; + } - auto bldr = ts::Builder(m_min_key, m_max_key, E); - for (size_t i=0; im_min_key < tmp_min_key) { + tmp_min_key = shards[i]->m_min_key; } + } + auto bldr = ts::Builder(tmp_min_key, tmp_max_key, E); + + m_max_key = tmp_min_key; + m_min_key = tmp_max_key; + + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + bldr.AddKey(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); + } + } + + if (cursor.ptr->rec.key < m_min_key) { + m_min_key = cursor.ptr->rec.key; + } + + if (cursor.ptr->rec.key > m_max_key) { + m_max_key = cursor.ptr->rec.key; + } + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_ts = bldr.Finalize(); } } -- cgit v1.2.3 From feeba1de8d9774e32007686627a6a5633dff2559 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 6 May 2024 09:40:11 -0400 Subject: Missing file from last commit --- include/shard/TrieSpline.h | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) (limited to 'include/shard') diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 3ae72f8..581277e 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -117,7 +117,7 @@ public: m_reccnt = info.record_count; m_tombstone_cnt = info.tombstone_count; - if (m_reccnt > 0) { + if (m_reccnt > 50) { m_ts = bldr.Finalize(); } } @@ -217,7 +217,7 @@ public: m_reccnt = info.record_count; m_tombstone_cnt = info.tombstone_count; - if (m_reccnt > 0) { + if (m_reccnt > 50) { m_ts = bldr.Finalize(); } } @@ -273,6 +273,18 @@ public: } size_t get_lower_bound(const K& key) const { + if (m_reccnt < 50) { + size_t bd = m_reccnt; + for (size_t i=0; i= key) { + bd = i; + break; + } + } + + return bd; + } + auto bound = m_ts.GetSearchBound(key); size_t idx = bound.begin; -- cgit v1.2.3