diff options
Diffstat (limited to 'include/shard')
| -rw-r--r-- | include/shard/Alias.h | 2 | ||||
| -rw-r--r-- | include/shard/AugBTree.h | 2 | ||||
| -rw-r--r-- | include/shard/FSTrie.h | 200 | ||||
| -rw-r--r-- | include/shard/ISAMTree.h | 14 | ||||
| -rw-r--r-- | include/shard/LoudsPatricia.h | 199 | ||||
| -rw-r--r-- | include/shard/PGM.h | 150 | ||||
| -rw-r--r-- | include/shard/TrieSpline.h | 185 | ||||
| -rw-r--r-- | include/shard/VPTree.h | 51 |
8 files changed, 723 insertions, 80 deletions
diff --git a/include/shard/Alias.h b/include/shard/Alias.h index 9275952..72147d7 100644 --- a/include/shard/Alias.h +++ b/include/shard/Alias.h @@ -148,7 +148,7 @@ public: size_t get_memory_usage() { - return m_alloc_size; + return 0; } size_t get_aux_memory_usage() { diff --git a/include/shard/AugBTree.h b/include/shard/AugBTree.h index 54931bd..c60cbcd 100644 --- a/include/shard/AugBTree.h +++ b/include/shard/AugBTree.h @@ -148,7 +148,7 @@ public: } size_t get_memory_usage() { - return m_alloc_size + m_node_cnt * sizeof(AugBTreeNode<Wrapped<R>>); + return m_node_cnt * sizeof(AugBTreeNode<Wrapped<R>>); } size_t get_aux_memory_usage() { diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h new file mode 100644 index 0000000..3783b38 --- /dev/null +++ b/include/shard/FSTrie.h @@ -0,0 +1,200 @@ +/* + * include/shard/FSTrie.h + * + * Copyright (C) 2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * A shard shim around the FSTrie learned index. + */ +#pragma once + + +#include <vector> + +#include "framework/ShardRequirements.h" +#include "fst.hpp" +#include "util/SortedMerge.h" + +using psudb::CACHELINE_SIZE; +using psudb::BloomFilter; +using psudb::PriorityQueue; +using psudb::queue_record; +using psudb::byte; + +namespace de { + +template <KVPInterface R> +class FSTrie { +private: + + typedef decltype(R::key) K; + typedef decltype(R::value) V; + static_assert(std::is_same_v<K, const char*>, "FST requires const char* keys."); + +public: + FSTrie(BufferView<R> buffer) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + m_data = new Wrapped<R>[buffer.get_record_count()](); + m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count(); + + size_t cnt = 0; + std::vector<std::string> keys; + keys.reserve(buffer.get_record_count()); + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = new Wrapped<R>[buffer.get_record_count()](); + for (size_t i=0; i<buffer.get_record_count(); i++) { + temp_buffer[i] = *(buffer.get(i)); + } + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + for (size_t i=0; i<buffer.get_record_count(); i++) { + if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || temp_buffer[i].rec.key == "") { + continue; + } + + m_data[cnt] = temp_buffer[i]; + m_data[cnt].clear_timestamp(); + + keys.push_back(std::string(m_data[cnt].rec.key)); + cnt++; + } + + m_reccnt = cnt; + if (m_reccnt > 0) { + m_fst = new fst::Trie(keys, true, 1); + } + + delete[] temp_buffer; + } + + FSTrie(std::vector<FSTrie*> &shards) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec<R, FSTrie>(shards, &attemp_reccnt, &tombstone_count); + + m_data = new Wrapped<R>[attemp_reccnt](); + m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>); + + std::vector<std::string> keys; + keys.reserve(attemp_reccnt); + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i=0; i<cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } + + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { + m_data[m_reccnt] = *cursor.ptr; + keys.push_back(std::string(m_data[m_reccnt].rec.key)); + + m_reccnt++; + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + if (m_reccnt > 0) { + m_fst = new fst::Trie(keys, true, 1); + } + } + + ~FSTrie() { + delete[] m_data; + delete m_fst; + } + + Wrapped<R> *point_lookup(const R &rec, bool filter=false) { + + auto idx = m_fst->exactSearch(rec.key); + + if (idx == fst::kNotFound) { + return nullptr; + } + + // FIXME: for convenience, I'm treating this Trie as a unique index + // for now, so no need to scan forward and/or check values. This + // also makes the point lookup query class a lot easier to make. + // Ultimately, though, we can support non-unique indexes with some + // extra work. + + return m_data + idx; + } + + Wrapped<R>* get_data() const { + return m_data; + } + + size_t get_record_count() const { + return m_reccnt; + } + + size_t get_tombstone_count() const { + return 0; + } + + const Wrapped<R>* get_record_at(size_t idx) const { + if (idx >= m_reccnt) return nullptr; + return m_data + idx; + } + + + size_t get_memory_usage() { + return m_fst->getMemoryUsage(); + } + + size_t get_aux_memory_usage() { + return m_alloc_size; + } + + size_t get_lower_bound(R &rec) {return 0;} + size_t get_upper_bound(R &rec) {return 0;} + +private: + + Wrapped<R>* m_data; + size_t m_reccnt; + size_t m_alloc_size; + fst::Trie *m_fst; +}; +} diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index 3763271..1cca506 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -51,7 +51,7 @@ constexpr static size_t LEAF_FANOUT = NODE_SZ / sizeof(R); public: ISAMTree(BufferView<R> buffer) - : m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + : m_bf(nullptr) , m_isam_nodes(nullptr) , m_root(nullptr) , m_reccnt(0) @@ -59,19 +59,12 @@ public: , m_internal_node_cnt(0) , m_deleted_cnt(0) , m_alloc_size(0) - , m_data(nullptr) { m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), (byte**) &m_data); - /* - * without this, gcc seems to hoist the building of the array - * _above_ its allocation under -O3, resulting in memfaults. - */ - asm volatile ("" ::: "memory"); - auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); m_reccnt = res.record_count; m_tombstone_cnt = res.tombstone_count; @@ -90,13 +83,12 @@ public: , m_internal_node_cnt(0) , m_deleted_cnt(0) , m_alloc_size(0) - , m_data(nullptr) { size_t attemp_reccnt = 0; size_t tombstone_count = 0; auto cursors = build_cursor_vec<R, ISAMTree>(shards, &attemp_reccnt, &tombstone_count); - m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); + m_bf = nullptr; m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **) &m_data); @@ -149,7 +141,7 @@ public: size_t get_memory_usage() { - return m_alloc_size + m_internal_node_cnt * NODE_SZ; + return m_internal_node_cnt * NODE_SZ; } size_t get_aux_memory_usage() { diff --git a/include/shard/LoudsPatricia.h b/include/shard/LoudsPatricia.h new file mode 100644 index 0000000..3452839 --- /dev/null +++ b/include/shard/LoudsPatricia.h @@ -0,0 +1,199 @@ +/* + * include/shard/LoudsPatricia.h + * + * Copyright (C) 2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * A shard shim around the LoudsPatricia learned index. + */ +#pragma once + + +#include <vector> + +#include "framework/ShardRequirements.h" +#include "louds-patricia.hpp" +#include "util/SortedMerge.h" + +using psudb::CACHELINE_SIZE; +using psudb::BloomFilter; +using psudb::PriorityQueue; +using psudb::queue_record; +using psudb::byte; + +namespace de { + +template <KVPInterface R> +class LoudsPatricia { +private: + + typedef decltype(R::key) K; + typedef decltype(R::value) V; + static_assert(std::is_same_v<K, const char*>, "FST requires const char* keys."); + +public: + LoudsPatricia(BufferView<R> buffer) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + m_data = new Wrapped<R>[buffer.get_record_count()](); + m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count(); + + m_louds = new louds::Patricia(); + + size_t cnt = 0; + std::vector<std::string> keys; + keys.reserve(buffer.get_record_count()); + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = new Wrapped<R>[buffer.get_record_count()](); + for (size_t i=0; i<buffer.get_record_count(); i++) { + temp_buffer[i] = *(buffer.get(i)); + } + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + for (size_t i=0; i<buffer.get_record_count(); i++) { + if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || temp_buffer[i].rec.key == "") { + continue; + } + + m_data[cnt] = temp_buffer[i]; + m_data[cnt].clear_timestamp(); + + m_louds->add(std::string(m_data[cnt].rec.key)); + cnt++; + } + + m_reccnt = cnt; + if (m_reccnt > 0) { + m_louds->build(); + } + + delete[] temp_buffer; + } + + LoudsPatricia(std::vector<LoudsPatricia*> &shards) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec<R, LoudsPatricia>(shards, &attemp_reccnt, &tombstone_count); + + m_data = new Wrapped<R>[attemp_reccnt](); + m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>); + + m_louds = new louds::Patricia(); + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i=0; i<cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } + + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { + m_data[m_reccnt] = *cursor.ptr; + m_louds->add(std::string(m_data[m_reccnt].rec.key)); + m_reccnt++; + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + if (m_reccnt > 0) { + m_louds->build(); + } + } + + ~LoudsPatricia() { + delete[] m_data; + delete m_louds; + } + + Wrapped<R> *point_lookup(const R &rec, bool filter=false) { + + auto idx = m_louds->lookup(std::string(rec.key)); + + if (idx == -1) { + return nullptr; + } + + // FIXME: for convenience, I'm treating this Trie as a unique index + // for now, so no need to scan forward and/or check values. This + // also makes the point lookup query class a lot easier to make. + // Ultimately, though, we can support non-unique indexes with some + // extra work. + return m_data + idx; + } + + Wrapped<R>* get_data() const { + return m_data; + } + + size_t get_record_count() const { + return m_reccnt; + } + + size_t get_tombstone_count() const { + return 0; + } + + const Wrapped<R>* get_record_at(size_t idx) const { + if (idx >= m_reccnt) return nullptr; + return m_data + idx; + } + + + size_t get_memory_usage() { + return m_louds->size(); + } + + size_t get_aux_memory_usage() { + return m_alloc_size; + } + + size_t get_lower_bound(R &rec) {return 0;} + size_t get_upper_bound(R &rec) {return 0;} + +private: + + Wrapped<R>* m_data; + size_t m_reccnt; + size_t m_alloc_size; + louds::Patricia *m_louds; +}; +} diff --git a/include/shard/PGM.h b/include/shard/PGM.h index e2752ef..509796b 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -39,8 +39,7 @@ private: public: PGM(BufferView<R> buffer) - : m_data(nullptr) - , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + : m_bf(nullptr) , m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) { @@ -49,16 +48,63 @@ public: buffer.get_record_count() * sizeof(Wrapped<R>), (byte**) &m_data); - auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; - if (m_reccnt > 0) { - std::vector<K> keys; - for (size_t i=0; i<m_reccnt; i++) { - keys.emplace_back(m_data[i].rec.key); + std::vector<K> keys; + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + buffer.get_record_count(), + sizeof(Wrapped<R>)); + buffer.copy_to_buffer((byte *) temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + merge_info info = {0, 0}; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) + && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; + } + + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + keys.emplace_back(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf){ + m_bf->insert(base->rec); + } } + base++; + } + + free(temp_buffer); + + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_pgm = pgm::PGMIndex<K, epsilon>(keys); } } @@ -74,21 +120,65 @@ public: size_t tombstone_count = 0; auto cursors = build_cursor_vec<R, PGM>(shards, &attemp_reccnt, &tombstone_count); - m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **) &m_data); + std::vector<K> keys; - auto res = sorted_array_merge<R>(cursors, m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i=0; i<cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } - if (m_reccnt > 0) { - std::vector<K> keys; - for (size_t i=0; i<m_reccnt; i++) { - keys.emplace_back(m_data[i].rec.key); + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + keys.emplace_back(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); + } + } + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); } + } + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 0) { m_pgm = pgm::PGMIndex<K, epsilon>(keys); } } @@ -132,7 +222,7 @@ public: size_t get_memory_usage() { - return m_pgm.size_in_bytes() + m_alloc_size; + return m_pgm.size_in_bytes(); } size_t get_aux_memory_usage() { @@ -147,14 +237,16 @@ public: return m_reccnt; } - // If the region to search is less than some pre-specified - // amount, perform a linear scan to locate the record. + /* + * If the region to search is less than some pre-specified + * amount, perform a linear scan to locate the record. + */ if (bound.hi - bound.lo < 256) { while (idx < bound.hi && m_data[idx].rec.key < key) { idx++; } } else { - // Otherwise, perform a binary search + /* Otherwise, perform a binary search */ idx = bound.lo; size_t max = bound.hi; @@ -169,10 +261,26 @@ public: } + /* + * the upper bound returned by PGM is one passed the end of the + * array. If we are at that point, we should just return "not found" + */ + if (idx == m_reccnt) { + return idx; + } + + /* + * We may have walked one passed the actual lower bound, so check + * the index before the current one to see if it is the actual bound + */ if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) { return idx-1; } + /* + * Otherwise, check idx. If it is a valid bound, then return it, + * otherwise return "not found". + */ return (m_data[idx].rec.key >= key) ? idx : m_reccnt; } diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 2a432e8..581277e 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -36,39 +36,94 @@ private: public: TrieSpline(BufferView<R> buffer) - : m_data(nullptr) - , m_reccnt(0) + : m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) , m_max_key(0) , m_min_key(0) - , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) + , m_bf(nullptr) { m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), (byte**) &m_data); - auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + buffer.get_record_count(), + sizeof(Wrapped<R>)); + buffer.copy_to_buffer((byte *) temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + auto tmp_min_key = temp_buffer[0].rec.key; + auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key; + auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E); + + merge_info info = {0, 0}; + + m_min_key = tmp_max_key; + m_max_key = tmp_min_key; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) + && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; + } - if (m_reccnt > 0) { - m_min_key = m_data[0].rec.key; - m_max_key = m_data[m_reccnt-1].rec.key; + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + bldr.AddKey(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf){ + m_bf->insert(base->rec); + } + } - auto bldr = ts::Builder<K>(m_min_key, m_max_key, E); - for (size_t i=0; i<m_reccnt; i++) { - bldr.AddKey(m_data[i].rec.key); + if (base->rec.key < m_min_key) { + m_min_key = base->rec.key; + } + + if (base->rec.key > m_max_key) { + m_max_key = base->rec.key; } + base++; + } + + free(temp_buffer); + + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + + if (m_reccnt > 50) { m_ts = bldr.Finalize(); } } TrieSpline(std::vector<TrieSpline*> &shards) - : m_data(nullptr) - , m_reccnt(0) + : m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) , m_max_key(0) @@ -79,24 +134,90 @@ public: size_t tombstone_count = 0; auto cursors = build_cursor_vec<R, TrieSpline>(shards, &attemp_reccnt, &tombstone_count); - m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **) &m_data); - auto res = sorted_array_merge<R>(cursors, m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i=0; i<cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } - if (m_reccnt > 0) { - m_min_key = m_data[0].rec.key; - m_max_key = m_data[m_reccnt-1].rec.key; + auto tmp_max_key = shards[0]->m_max_key; + auto tmp_min_key = shards[0]->m_min_key; - auto bldr = ts::Builder<K>(m_min_key, m_max_key, E); - for (size_t i=0; i<m_reccnt; i++) { - bldr.AddKey(m_data[i].rec.key); + for (size_t i=0; i<shards.size(); i++) { + if (shards[i]->m_max_key > tmp_max_key) { + tmp_max_key = shards[i]->m_max_key; + } + + if (shards[i]->m_min_key < tmp_min_key) { + tmp_min_key = shards[i]->m_min_key; } + } + + auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E); + + m_max_key = tmp_min_key; + m_min_key = tmp_max_key; + + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + bldr.AddKey(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); + } + } + + if (cursor.ptr->rec.key < m_min_key) { + m_min_key = cursor.ptr->rec.key; + } + + if (cursor.ptr->rec.key > m_max_key) { + m_max_key = cursor.ptr->rec.key; + } + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; + if (m_reccnt > 50) { m_ts = bldr.Finalize(); } } @@ -107,7 +228,7 @@ public: } Wrapped<R> *point_lookup(const R &rec, bool filter=false) { - if (filter && !m_bf->lookup(rec)) { + if (filter && m_bf && !m_bf->lookup(rec)) { return nullptr; } @@ -144,7 +265,7 @@ public: size_t get_memory_usage() { - return m_ts.GetSize() + m_alloc_size; + return m_ts.GetSize(); } size_t get_aux_memory_usage() { @@ -152,6 +273,18 @@ public: } size_t get_lower_bound(const K& key) const { + if (m_reccnt < 50) { + size_t bd = m_reccnt; + for (size_t i=0; i<m_reccnt; i++) { + if (m_data[i].rec.key >= key) { + bd = i; + break; + } + } + + return bd; + } + auto bound = m_ts.GetSearchBound(key); size_t idx = bound.begin; diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h index b342fe6..d5a2393 100644 --- a/include/shard/VPTree.h +++ b/include/shard/VPTree.h @@ -58,7 +58,7 @@ public: sizeof(Wrapped<R>), (byte**) &m_data); - m_ptrs = new Wrapped<R>*[buffer.get_record_count()]; + m_ptrs = new vp_ptr[buffer.get_record_count()]; size_t offset = 0; m_reccnt = 0; @@ -76,7 +76,7 @@ public: rec->header &= 3; m_data[m_reccnt] = *rec; - m_ptrs[m_reccnt] = &m_data[m_reccnt]; + m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; m_reccnt++; } @@ -97,7 +97,7 @@ public: m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **) &m_data); - m_ptrs = new Wrapped<R>*[attemp_reccnt]; + m_ptrs = new vp_ptr[attemp_reccnt]; // FIXME: will eventually need to figure out tombstones // this one will likely require the multi-pass @@ -110,7 +110,7 @@ public: } m_data[m_reccnt] = *shards[i]->get_record_at(j); - m_ptrs[m_reccnt] = &m_data[m_reccnt]; + m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; m_reccnt++; } } @@ -139,8 +139,8 @@ public: } else { vpnode *node = m_root; - while (!node->leaf && m_ptrs[node->start]->rec != rec) { - if (rec.calc_distance((m_ptrs[node->start]->rec)) >= node->radius) { + while (!node->leaf && m_ptrs[node->start].ptr->rec != rec) { + if (rec.calc_distance((m_ptrs[node->start].ptr->rec)) >= node->radius) { node = node->outside; } else { node = node->inside; @@ -148,8 +148,8 @@ public: } for (size_t i=node->start; i<=node->stop; i++) { - if (m_ptrs[i]->rec == rec) { - return m_ptrs[i]; + if (m_ptrs[i].ptr->rec == rec) { + return m_ptrs[i].ptr; } } @@ -175,7 +175,7 @@ public: } size_t get_memory_usage() { - return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*) + m_alloc_size; + return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*); } size_t get_aux_memory_usage() { @@ -191,8 +191,12 @@ public: } private: + struct vp_ptr { + Wrapped<R> *ptr; + double dist; + }; Wrapped<R>* m_data; - Wrapped<R>** m_ptrs; + vp_ptr* m_ptrs; std::unordered_map<R, size_t, RecordHash<R>> m_lookup_map; size_t m_reccnt; size_t m_tombstone_cnt; @@ -260,6 +264,11 @@ private: auto i = start + gsl_rng_uniform_int(rng, stop - start + 1); swap(start, i); + /* for efficiency, we'll pre-calculate the distances between each point and the root */ + for (size_t i=start+1; i<=stop; i++) { + m_ptrs[i].dist = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[i].ptr->rec); + } + /* * partition elements based on their distance from the start, * with those elements with distance falling below the median @@ -267,14 +276,15 @@ private: * the median in the right. This is easily done using QuickSelect. */ auto mid = (start + 1 + stop) / 2; - quickselect(start + 1, stop, mid, m_ptrs[start], rng); + quickselect(start + 1, stop, mid, m_ptrs[start].ptr, rng); /* Create a new node based on this partitioning */ vpnode *node = new vpnode(); node->start = start; /* store the radius of the circle used for partitioning the node. */ - node->radius = m_ptrs[start]->rec.calc_distance(m_ptrs[mid]->rec); + node->radius = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[mid].ptr->rec); + m_ptrs[start].dist = node->radius; /* recursively construct the left and right subtrees */ node->inside = build_subtree(start + 1, mid-1, rng); @@ -285,8 +295,6 @@ private: return node; } - // TODO: The quickselect code can probably be generalized and moved out - // to psudb-common instead. void quickselect(size_t start, size_t stop, size_t k, Wrapped<R> *p, gsl_rng *rng) { if (start == stop) return; @@ -303,13 +311,16 @@ private: // to psudb-common instead. size_t partition(size_t start, size_t stop, Wrapped<R> *p, gsl_rng *rng) { auto pivot = start + gsl_rng_uniform_int(rng, stop - start); - double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec); + //double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec); swap(pivot, stop); size_t j = start; for (size_t i=start; i<stop; i++) { - if (p->rec.calc_distance(m_ptrs[i]->rec) < pivot_dist) { + if (m_ptrs[i].dist < m_ptrs[stop].dist) { + //assert(distances[i - start] == p->rec.calc_distance(m_ptrs[i]->rec)); + //if (distances[i - start] < distances[stop - start]) { + //if (p->rec .calc_distance(m_ptrs[i]->rec) < pivot_dist) { swap(j, i); j++; } @@ -332,13 +343,13 @@ private: if (node->leaf) { for (size_t i=node->start; i<=node->stop; i++) { - double d = point.calc_distance(m_ptrs[i]->rec); + double d = point.calc_distance(m_ptrs[i].ptr->rec); if (d < *farthest) { if (pq.size() == k) { pq.pop(); } - pq.push(m_ptrs[i]); + pq.push(m_ptrs[i].ptr); if (pq.size() == k) { *farthest = point.calc_distance(pq.peek().data->rec); } @@ -348,14 +359,14 @@ private: return; } - double d = point.calc_distance(m_ptrs[node->start]->rec); + double d = point.calc_distance(m_ptrs[node->start].ptr->rec); if (d < *farthest) { if (pq.size() == k) { auto t = pq.peek().data->rec; pq.pop(); } - pq.push(m_ptrs[node->start]); + pq.push(m_ptrs[node->start].ptr); if (pq.size() == k) { *farthest = point.calc_distance(pq.peek().data->rec); } |