diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-09-25 14:42:44 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-09-25 14:42:44 -0400 |
| commit | cf5f3bbb0cb58430ed68ad3ebfcefc009e553d71 (patch) | |
| tree | 4c17bc3169ee195c236cea9c9efda0aef7488e3c /include/shard | |
| parent | 826c1fff5accbaa6b415acc176a5acbeb5f691b6 (diff) | |
| download | dynamic-extension-cf5f3bbb0cb58430ed68ad3ebfcefc009e553d71.tar.gz | |
Code reformatting
Diffstat (limited to 'include/shard')
| -rw-r--r-- | include/shard/Alias.h | 259 | ||||
| -rw-r--r-- | include/shard/FSTrie.h | 282 | ||||
| -rw-r--r-- | include/shard/ISAMTree.h | 4 | ||||
| -rw-r--r-- | include/shard/LoudsPatricia.h | 278 | ||||
| -rw-r--r-- | include/shard/PGM.h | 451 | ||||
| -rw-r--r-- | include/shard/TrieSpline.h | 503 | ||||
| -rw-r--r-- | include/shard/VPTree.h | 605 |
7 files changed, 1155 insertions, 1227 deletions
diff --git a/include/shard/Alias.h b/include/shard/Alias.h index 15b0884..c176fa2 100644 --- a/include/shard/Alias.h +++ b/include/shard/Alias.h @@ -20,186 +20,163 @@ #include "psu-ds/Alias.h" #include "psu-ds/BloomFilter.h" -#include "util/bf_config.h" #include "util/SortedMerge.h" +#include "util/bf_config.h" -using psudb::CACHELINE_SIZE; using psudb::BloomFilter; using psudb::byte; +using psudb::CACHELINE_SIZE; namespace de { -template <WeightedRecordInterface R> -class Alias { +template <WeightedRecordInterface R> class Alias { public: - typedef R RECORD; -private: - typedef decltype(R::key) K; - typedef decltype(R::value) V; - typedef decltype(R::weight) W; + typedef R RECORD; +private: + typedef decltype(R::key) K; + typedef decltype(R::value) V; + typedef decltype(R::weight) W; public: - Alias(BufferView<R> buffer) - : m_data(nullptr) - , m_alias(nullptr) - , m_total_weight(0) - , m_reccnt(0) - , m_tombstone_cnt(0) - , m_alloc_size(0) - , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) { - - - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - buffer.get_record_count() * - sizeof(Wrapped<R>), - (byte**) &m_data); - - auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; - - if (m_reccnt > 0) { - std::vector<W> weights; - for (size_t i=0; i<m_reccnt; i++) { - weights.emplace_back(m_data[i].rec.weight); - m_total_weight += m_data[i].rec.weight; - } - - build_alias_structure(weights); - } + Alias(BufferView<R> buffer) + : m_data(nullptr), m_alias(nullptr), m_total_weight(0), m_reccnt(0), + m_tombstone_cnt(0), m_alloc_size(0), + m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), + BF_HASH_FUNCS)) { + + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), + (byte **)&m_data); + + auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; + + if (m_reccnt > 0) { + std::vector<W> weights; + for (size_t i = 0; i < m_reccnt; i++) { + weights.emplace_back(m_data[i].rec.weight); + m_total_weight += m_data[i].rec.weight; + } + + build_alias_structure(weights); } + } - Alias(std::vector<const Alias*> const &shards) - : m_data(nullptr) - , m_alias(nullptr) - , m_total_weight(0) - , m_reccnt(0) - , m_tombstone_cnt(0) - , m_alloc_size(0) - , m_bf(nullptr) { - - size_t attemp_reccnt = 0; - size_t tombstone_count = 0; - auto cursors = build_cursor_vec<R, Alias>(shards, &attemp_reccnt, &tombstone_count); - - m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - attemp_reccnt * sizeof(Wrapped<R>), - (byte **) &m_data); - - auto res = sorted_array_merge<R>(cursors, m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; - - if (m_reccnt > 0) { - std::vector<W> weights; - for (size_t i=0; i<m_reccnt; i++) { - weights.emplace_back(m_data[i].rec.weight); - m_total_weight += m_data[i].rec.weight; - } - - build_alias_structure(weights); - } - } - - ~Alias() { - free(m_data); - delete m_alias; - delete m_bf; - } + Alias(std::vector<const Alias *> const &shards) + : m_data(nullptr), m_alias(nullptr), m_total_weight(0), m_reccnt(0), + m_tombstone_cnt(0), m_alloc_size(0), m_bf(nullptr) { - Wrapped<R> *point_lookup(const R &rec, bool filter=false) { - if (filter && !m_bf->lookup(rec)) { - return nullptr; - } + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = + build_cursor_vec<R, Alias>(shards, &attemp_reccnt, &tombstone_count); - size_t idx = get_lower_bound(rec.key); - if (idx >= m_reccnt) { - return nullptr; - } + m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **)&m_data); - while (idx < (m_reccnt-1) && m_data[idx].rec < rec) ++idx; + auto res = sorted_array_merge<R>(cursors, m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; - if (m_data[idx].rec == rec) { - return m_data + idx; - } + if (m_reccnt > 0) { + std::vector<W> weights; + for (size_t i = 0; i < m_reccnt; i++) { + weights.emplace_back(m_data[i].rec.weight); + m_total_weight += m_data[i].rec.weight; + } - return nullptr; + build_alias_structure(weights); } + } - Wrapped<R>* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; - } + ~Alias() { + free(m_data); + delete m_alias; + delete m_bf; + } - size_t get_tombstone_count() const { - return m_tombstone_cnt; + Wrapped<R> *point_lookup(const R &rec, bool filter = false) { + if (filter && !m_bf->lookup(rec)) { + return nullptr; } - const Wrapped<R>* get_record_at(size_t idx) const { - if (idx >= m_reccnt) return nullptr; - return m_data + idx; + size_t idx = get_lower_bound(rec.key); + if (idx >= m_reccnt) { + return nullptr; } + while (idx < (m_reccnt - 1) && m_data[idx].rec < rec) + ++idx; - size_t get_memory_usage() const { - return 0; + if (m_data[idx].rec == rec) { + return m_data + idx; } - size_t get_aux_memory_usage() const { - return (m_bf) ? m_bf->memory_usage() : 0; - } + return nullptr; + } - W get_total_weight() const { - return m_total_weight; - } + Wrapped<R> *get_data() const { return m_data; } - size_t get_weighted_sample(gsl_rng *rng) const { - return m_alias->get(rng); - } + size_t get_record_count() const { return m_reccnt; } - size_t get_lower_bound(const K& key) const { - size_t min = 0; - size_t max = m_reccnt - 1; + size_t get_tombstone_count() const { return m_tombstone_cnt; } - while (min < max) { - size_t mid = (min + max) / 2; + const Wrapped<R> *get_record_at(size_t idx) const { + if (idx >= m_reccnt) + return nullptr; + return m_data + idx; + } - if (key > m_data[mid].rec.key) { - min = mid + 1; - } else { - max = mid; - } - } + size_t get_memory_usage() const { return 0; } - return min; - } + size_t get_aux_memory_usage() const { + return (m_bf) ? m_bf->memory_usage() : 0; + } -private: + W get_total_weight() const { return m_total_weight; } + + size_t get_weighted_sample(gsl_rng *rng) const { return m_alias->get(rng); } - void build_alias_structure(std::vector<W> &weights) { + size_t get_lower_bound(const K &key) const { + size_t min = 0; + size_t max = m_reccnt - 1; - // normalize the weights vector - std::vector<double> norm_weights(weights.size()); + while (min < max) { + size_t mid = (min + max) / 2; + + if (key > m_data[mid].rec.key) { + min = mid + 1; + } else { + max = mid; + } + } + + return min; + } + +private: + void build_alias_structure(std::vector<W> &weights) { - for (size_t i=0; i<weights.size(); i++) { - norm_weights[i] = (double) weights[i] / (double) m_total_weight; - } + // normalize the weights vector + std::vector<double> norm_weights(weights.size()); - // build the alias structure - m_alias = new psudb::Alias(norm_weights); + for (size_t i = 0; i < weights.size(); i++) { + norm_weights[i] = (double)weights[i] / (double)m_total_weight; } - Wrapped<R>* m_data; - psudb::Alias *m_alias; - W m_total_weight; - size_t m_reccnt; - size_t m_tombstone_cnt; - size_t m_alloc_size; - BloomFilter<R> *m_bf; + // build the alias structure + m_alias = new psudb::Alias(norm_weights); + } + + Wrapped<R> *m_data; + psudb::Alias *m_alias; + W m_total_weight; + size_t m_reccnt; + size_t m_tombstone_cnt; + size_t m_alloc_size; + BloomFilter<R> *m_bf; }; -} +} // namespace de diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index 59ff116..31db40b 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -9,194 +9,182 @@ */ #pragma once - #include <vector> #include "framework/ShardRequirements.h" #include "fst.hpp" #include "util/SortedMerge.h" -using psudb::CACHELINE_SIZE; using psudb::BloomFilter; +using psudb::byte; +using psudb::CACHELINE_SIZE; using psudb::PriorityQueue; using psudb::queue_record; -using psudb::byte; namespace de { -template <KVPInterface R> -class FSTrie { +template <KVPInterface R> class FSTrie { public: - typedef R RECORD; -private: + typedef R RECORD; - typedef decltype(R::key) K; - typedef decltype(R::value) V; - static_assert(std::is_same_v<K, const char*>, "FST requires const char* keys."); +private: + typedef decltype(R::key) K; + typedef decltype(R::value) V; + static_assert(std::is_same_v<K, const char *>, + "FST requires const char* keys."); public: - FSTrie(BufferView<R> buffer) - : m_data(nullptr) - , m_reccnt(0) - , m_alloc_size(0) - { - m_data = new Wrapped<R>[buffer.get_record_count()](); - m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count(); - - size_t cnt = 0; - std::vector<std::string> keys; - keys.reserve(buffer.get_record_count()); - - /* - * Copy the contents of the buffer view into a temporary buffer, and - * sort them. We still need to iterate over these temporary records to - * apply tombstone/deleted record filtering, as well as any possible - * per-record processing that is required by the shard being built. - */ - auto temp_buffer = new Wrapped<R>[buffer.get_record_count()](); - for (size_t i=0; i<buffer.get_record_count(); i++) { - temp_buffer[i] = *(buffer.get(i)); - } - - auto base = temp_buffer; - auto stop = base + buffer.get_record_count(); - std::sort(base, stop, std::less<Wrapped<R>>()); + FSTrie(BufferView<R> buffer) : m_data(nullptr), m_reccnt(0), m_alloc_size(0) { + m_data = new Wrapped<R>[buffer.get_record_count()](); + m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count(); + + size_t cnt = 0; + std::vector<std::string> keys; + keys.reserve(buffer.get_record_count()); + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = new Wrapped<R>[buffer.get_record_count()](); + for (size_t i = 0; i < buffer.get_record_count(); i++) { + temp_buffer[i] = *(buffer.get(i)); + } - for (size_t i=0; i<buffer.get_record_count(); i++) { - if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || temp_buffer[i].rec.key[0] != '\0') { - continue; - } + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); - m_data[cnt] = temp_buffer[i]; - m_data[cnt].clear_timestamp(); + for (size_t i = 0; i < buffer.get_record_count(); i++) { + if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || + temp_buffer[i].rec.key[0] != '\0') { + continue; + } - keys.push_back(std::string(m_data[cnt].rec.key)); - cnt++; - } + m_data[cnt] = temp_buffer[i]; + m_data[cnt].clear_timestamp(); - m_reccnt = cnt; - if (m_reccnt > 0) { - m_fst = new fst::Trie(keys, true, 1); - } + keys.push_back(std::string(m_data[cnt].rec.key)); + cnt++; + } - delete[] temp_buffer; + m_reccnt = cnt; + if (m_reccnt > 0) { + m_fst = new fst::Trie(keys, true, 1); } - FSTrie(std::vector<const FSTrie*> const &shards) - : m_data(nullptr) - , m_reccnt(0) - , m_alloc_size(0) - { - size_t attemp_reccnt = 0; - size_t tombstone_count = 0; - auto cursors = build_cursor_vec<R, FSTrie>(shards, &attemp_reccnt, &tombstone_count); - - m_data = new Wrapped<R>[attemp_reccnt](); - m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>); - - std::vector<std::string> keys; - keys.reserve(attemp_reccnt); - - // FIXME: For smaller cursor arrays, it may be more efficient to skip - // the priority queue and just do a scan. - PriorityQueue<Wrapped<R>> pq(cursors.size()); - for (size_t i=0; i<cursors.size(); i++) { - pq.push(cursors[i].ptr, i); - } + delete[] temp_buffer; + } - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; - /* - * if the current record is not a tombstone, and the next record is - * a tombstone that matches the current one, then the current one - * has been deleted, and both it and its tombstone can be skipped - * over. - */ - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - /* skip over records that have been deleted via tagging */ - if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key[0] != '\0') { - m_data[m_reccnt] = *cursor.ptr; - keys.push_back(std::string(m_data[m_reccnt].rec.key)); - - m_reccnt++; - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); - } - } + FSTrie(std::vector<const FSTrie *> const &shards) + : m_data(nullptr), m_reccnt(0), m_alloc_size(0) { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = + build_cursor_vec<R, FSTrie>(shards, &attemp_reccnt, &tombstone_count); - if (m_reccnt > 0) { - m_fst = new fst::Trie(keys, true, 1); - } + m_data = new Wrapped<R>[attemp_reccnt](); + m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>); + + std::vector<std::string> keys; + keys.reserve(attemp_reccnt); + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i = 0; i < cursors.size(); i++) { + pq.push(cursors[i].ptr, i); } - ~FSTrie() { - delete[] m_data; - delete m_fst; + while (pq.size()) { + auto now = pq.peek(); + auto next = + pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[now.version]; + auto &cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key[0] != '\0') { + m_data[m_reccnt] = *cursor.ptr; + keys.push_back(std::string(m_data[m_reccnt].rec.key)); + + m_reccnt++; + } + pq.pop(); + + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); + } } - Wrapped<R> *point_lookup(const R &rec, bool filter=false) { + if (m_reccnt > 0) { + m_fst = new fst::Trie(keys, true, 1); + } + } - auto idx = m_fst->exactSearch(rec.key); + ~FSTrie() { + delete[] m_data; + delete m_fst; + } - if (idx == fst::kNotFound) { - return nullptr; - } + Wrapped<R> *point_lookup(const R &rec, bool filter = false) { - // FIXME: for convenience, I'm treating this Trie as a unique index - // for now, so no need to scan forward and/or check values. This - // also makes the point lookup query class a lot easier to make. - // Ultimately, though, we can support non-unique indexes with some - // extra work. + auto idx = m_fst->exactSearch(rec.key); - return m_data + idx; + if (idx == fst::kNotFound) { + return nullptr; } - Wrapped<R>* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; - } + // FIXME: for convenience, I'm treating this Trie as a unique index + // for now, so no need to scan forward and/or check values. This + // also makes the point lookup query class a lot easier to make. + // Ultimately, though, we can support non-unique indexes with some + // extra work. - size_t get_tombstone_count() const { - return 0; - } + return m_data + idx; + } - const Wrapped<R>* get_record_at(size_t idx) const { - if (idx >= m_reccnt) return nullptr; - return m_data + idx; - } + Wrapped<R> *get_data() const { return m_data; } + size_t get_record_count() const { return m_reccnt; } - size_t get_memory_usage() const { - return m_fst->getMemoryUsage(); - } + size_t get_tombstone_count() const { return 0; } - size_t get_aux_memory_usage() const { - return m_alloc_size; - } + const Wrapped<R> *get_record_at(size_t idx) const { + if (idx >= m_reccnt) + return nullptr; + return m_data + idx; + } - size_t get_lower_bound(R &rec) {return 0;} - size_t get_upper_bound(R &rec) {return 0;} + size_t get_memory_usage() const { return m_fst->getMemoryUsage(); } -private: + size_t get_aux_memory_usage() const { return m_alloc_size; } + + size_t get_lower_bound(R &rec) { return 0; } + size_t get_upper_bound(R &rec) { return 0; } - Wrapped<R>* m_data; - size_t m_reccnt; - size_t m_alloc_size; - fst::Trie *m_fst; +private: + Wrapped<R> *m_data; + size_t m_reccnt; + size_t m_alloc_size; + fst::Trie *m_fst; }; -} +} // namespace de diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index f6b525f..6722aaf 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -121,7 +121,9 @@ public: size_t get_memory_usage() const { return m_internal_node_cnt * NODE_SZ; } - size_t get_aux_memory_usage() const { return (m_bf) ? m_bf->memory_usage() : 0; } + size_t get_aux_memory_usage() const { + return (m_bf) ? m_bf->memory_usage() : 0; + } /* SortedShardInterface methods */ size_t get_lower_bound(const K &key) const { diff --git a/include/shard/LoudsPatricia.h b/include/shard/LoudsPatricia.h index fe0c30e..0b7c74c 100644 --- a/include/shard/LoudsPatricia.h +++ b/include/shard/LoudsPatricia.h @@ -9,191 +9,179 @@ */ #pragma once - #include <vector> #include "framework/ShardRequirements.h" #include "louds-patricia.hpp" #include "util/SortedMerge.h" -using psudb::CACHELINE_SIZE; using psudb::BloomFilter; +using psudb::byte; +using psudb::CACHELINE_SIZE; using psudb::PriorityQueue; using psudb::queue_record; -using psudb::byte; namespace de { -template <KVPInterface R> -class LoudsPatricia { +template <KVPInterface R> class LoudsPatricia { private: - - typedef decltype(R::key) K; - typedef decltype(R::value) V; - static_assert(std::is_same_v<K, const char*>, "FST requires const char* keys."); + typedef decltype(R::key) K; + typedef decltype(R::value) V; + static_assert(std::is_same_v<K, const char *>, + "FST requires const char* keys."); public: - LoudsPatricia(BufferView<R> buffer) - : m_data(nullptr) - , m_reccnt(0) - , m_alloc_size(0) - { - m_data = new Wrapped<R>[buffer.get_record_count()](); - m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count(); - - m_louds = new louds::Patricia(); - - size_t cnt = 0; - std::vector<std::string> keys; - keys.reserve(buffer.get_record_count()); - - /* - * Copy the contents of the buffer view into a temporary buffer, and - * sort them. We still need to iterate over these temporary records to - * apply tombstone/deleted record filtering, as well as any possible - * per-record processing that is required by the shard being built. - */ - auto temp_buffer = new Wrapped<R>[buffer.get_record_count()](); - for (size_t i=0; i<buffer.get_record_count(); i++) { - temp_buffer[i] = *(buffer.get(i)); - } - - auto base = temp_buffer; - auto stop = base + buffer.get_record_count(); - std::sort(base, stop, std::less<Wrapped<R>>()); + LoudsPatricia(BufferView<R> buffer) + : m_data(nullptr), m_reccnt(0), m_alloc_size(0) { + m_data = new Wrapped<R>[buffer.get_record_count()](); + m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count(); + + m_louds = new louds::Patricia(); + + size_t cnt = 0; + std::vector<std::string> keys; + keys.reserve(buffer.get_record_count()); + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = new Wrapped<R>[buffer.get_record_count()](); + for (size_t i = 0; i < buffer.get_record_count(); i++) { + temp_buffer[i] = *(buffer.get(i)); + } - for (size_t i=0; i<buffer.get_record_count(); i++) { - if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || temp_buffer[i].rec.key == "") { - continue; - } + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); - m_data[cnt] = temp_buffer[i]; - m_data[cnt].clear_timestamp(); + for (size_t i = 0; i < buffer.get_record_count(); i++) { + if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || + temp_buffer[i].rec.key == "") { + continue; + } - m_louds->add(std::string(m_data[cnt].rec.key)); - cnt++; - } + m_data[cnt] = temp_buffer[i]; + m_data[cnt].clear_timestamp(); - m_reccnt = cnt; - if (m_reccnt > 0) { - m_louds->build(); - } + m_louds->add(std::string(m_data[cnt].rec.key)); + cnt++; + } - delete[] temp_buffer; + m_reccnt = cnt; + if (m_reccnt > 0) { + m_louds->build(); } - LoudsPatricia(std::vector<const LoudsPatricia*> &shards) - : m_data(nullptr) - , m_reccnt(0) - , m_alloc_size(0) - { - size_t attemp_reccnt = 0; - size_t tombstone_count = 0; - auto cursors = build_cursor_vec<R, LoudsPatricia>(shards, &attemp_reccnt, &tombstone_count); - - m_data = new Wrapped<R>[attemp_reccnt](); - m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>); - - m_louds = new louds::Patricia(); - - // FIXME: For smaller cursor arrays, it may be more efficient to skip - // the priority queue and just do a scan. - PriorityQueue<Wrapped<R>> pq(cursors.size()); - for (size_t i=0; i<cursors.size(); i++) { - pq.push(cursors[i].ptr, i); - } + delete[] temp_buffer; + } - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; - /* - * if the current record is not a tombstone, and the next record is - * a tombstone that matches the current one, then the current one - * has been deleted, and both it and its tombstone can be skipped - * over. - */ - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - /* skip over records that have been deleted via tagging */ - if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { - m_data[m_reccnt] = *cursor.ptr; - m_louds->add(std::string(m_data[m_reccnt].rec.key)); - m_reccnt++; - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); - } - } + LoudsPatricia(std::vector<const LoudsPatricia *> &shards) + : m_data(nullptr), m_reccnt(0), m_alloc_size(0) { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec<R, LoudsPatricia>(shards, &attemp_reccnt, + &tombstone_count); - if (m_reccnt > 0) { - m_louds->build(); + m_data = new Wrapped<R>[attemp_reccnt](); + m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>); + + m_louds = new louds::Patricia(); + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i = 0; i < cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } + + while (pq.size()) { + auto now = pq.peek(); + auto next = + pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[now.version]; + auto &cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { + m_data[m_reccnt] = *cursor.ptr; + m_louds->add(std::string(m_data[m_reccnt].rec.key)); + m_reccnt++; } + pq.pop(); + + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); + } } - ~LoudsPatricia() { - delete[] m_data; - delete m_louds; + if (m_reccnt > 0) { + m_louds->build(); } + } - Wrapped<R> *point_lookup(const R &rec, bool filter=false) { + ~LoudsPatricia() { + delete[] m_data; + delete m_louds; + } - auto idx = m_louds->lookup(std::string(rec.key)); + Wrapped<R> *point_lookup(const R &rec, bool filter = false) { - if (idx == -1) { - return nullptr; - } + auto idx = m_louds->lookup(std::string(rec.key)); - // FIXME: for convenience, I'm treating this Trie as a unique index - // for now, so no need to scan forward and/or check values. This - // also makes the point lookup query class a lot easier to make. - // Ultimately, though, we can support non-unique indexes with some - // extra work. - return m_data + idx; + if (idx == -1) { + return nullptr; } - Wrapped<R>* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; - } + // FIXME: for convenience, I'm treating this Trie as a unique index + // for now, so no need to scan forward and/or check values. This + // also makes the point lookup query class a lot easier to make. + // Ultimately, though, we can support non-unique indexes with some + // extra work. + return m_data + idx; + } - size_t get_tombstone_count() const { - return 0; - } + Wrapped<R> *get_data() const { return m_data; } - const Wrapped<R>* get_record_at(size_t idx) const { - if (idx >= m_reccnt) return nullptr; - return m_data + idx; - } + size_t get_record_count() const { return m_reccnt; } + size_t get_tombstone_count() const { return 0; } - size_t get_memory_usage() const { - return m_louds->size(); - } + const Wrapped<R> *get_record_at(size_t idx) const { + if (idx >= m_reccnt) + return nullptr; + return m_data + idx; + } - size_t get_aux_memory_usage() const { - return m_alloc_size; - } + size_t get_memory_usage() const { return m_louds->size(); } - size_t get_lower_bound(R &rec) {return 0;} - size_t get_upper_bound(R &rec) {return 0;} + size_t get_aux_memory_usage() const { return m_alloc_size; } -private: + size_t get_lower_bound(R &rec) { return 0; } + size_t get_upper_bound(R &rec) { return 0; } - Wrapped<R>* m_data; - size_t m_reccnt; - size_t m_alloc_size; - louds::Patricia *m_louds; +private: + Wrapped<R> *m_data; + size_t m_reccnt; + size_t m_alloc_size; + louds::Patricia *m_louds; }; -} +} // namespace de diff --git a/include/shard/PGM.h b/include/shard/PGM.h index 5b39ab4..40c9141 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -13,7 +13,6 @@ */ #pragma once - #include <vector> #include "framework/ShardRequirements.h" @@ -23,278 +22,268 @@ #include "util/SortedMerge.h" #include "util/bf_config.h" -using psudb::CACHELINE_SIZE; using psudb::BloomFilter; +using psudb::byte; +using psudb::CACHELINE_SIZE; using psudb::PriorityQueue; using psudb::queue_record; -using psudb::byte; namespace de { -template <RecordInterface R, size_t epsilon=128> -class PGM { +template <RecordInterface R, size_t epsilon = 128> class PGM { public: - typedef R RECORD; + typedef R RECORD; + private: - typedef decltype(R::key) K; - typedef decltype(R::value) V; + typedef decltype(R::key) K; + typedef decltype(R::value) V; public: - PGM(BufferView<R> buffer) - : m_bf(nullptr) - , m_reccnt(0) - , m_tombstone_cnt(0) - , m_alloc_size(0) { - - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - buffer.get_record_count() * - sizeof(Wrapped<R>), - (byte**) &m_data); - - std::vector<K> keys; - /* - * Copy the contents of the buffer view into a temporary buffer, and - * sort them. We still need to iterate over these temporary records to - * apply tombstone/deleted record filtering, as well as any possible - * per-record processing that is required by the shard being built. - */ - auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, - buffer.get_record_count(), - sizeof(Wrapped<R>)); - buffer.copy_to_buffer((byte *) temp_buffer); - - auto base = temp_buffer; - auto stop = base + buffer.get_record_count(); - std::sort(base, stop, std::less<Wrapped<R>>()); - - merge_info info = {0, 0}; - - /* - * Iterate over the temporary buffer to process the records, copying - * them into buffer as needed - */ - while (base < stop) { - if (!base->is_tombstone() && (base + 1 < stop) - && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - continue; - } else if (base->is_deleted()) { - base += 1; - continue; - } - - // FIXME: this shouldn't be necessary, but the tagged record - // bypass doesn't seem to be working on this code-path, so this - // ensures that tagged records from the buffer are able to be - // dropped, eventually. It should only need to be &= 1 - base->header &= 3; - keys.emplace_back(base->rec.key); - m_data[info.record_count++] = *base; - - if (base->is_tombstone()) { - info.tombstone_count++; - if (m_bf){ - m_bf->insert(base->rec); - } - } - - base++; + PGM(BufferView<R> buffer) + : m_bf(nullptr), m_reccnt(0), m_tombstone_cnt(0), m_alloc_size(0) { + + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), + (byte **)&m_data); + + std::vector<K> keys; + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped<R> *)psudb::sf_aligned_calloc( + CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>)); + buffer.copy_to_buffer((byte *)temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + merge_info info = {0, 0}; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) && + base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; + } + + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + keys.emplace_back(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(base->rec); } + } - free(temp_buffer); - - m_reccnt = info.record_count; - m_tombstone_cnt = info.tombstone_count; - - if (m_reccnt > 0) { - m_pgm = pgm::PGMIndex<K, epsilon>(keys); - } + base++; } - PGM(std::vector<const PGM*> const &shards) - : m_data(nullptr) - , m_bf(nullptr) - , m_reccnt(0) - , m_tombstone_cnt(0) - , m_alloc_size(0) { - - size_t attemp_reccnt = 0; - size_t tombstone_count = 0; - auto cursors = build_cursor_vec<R, PGM>(shards, &attemp_reccnt, &tombstone_count); - - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - attemp_reccnt * sizeof(Wrapped<R>), - (byte **) &m_data); - std::vector<K> keys; - - // FIXME: For smaller cursor arrays, it may be more efficient to skip - // the priority queue and just do a scan. - PriorityQueue<Wrapped<R>> pq(cursors.size()); - for (size_t i=0; i<cursors.size(); i++) { - pq.push(cursors[i].ptr, i); - } + free(temp_buffer); - merge_info info = {0, 0}; - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; - /* - * if the current record is not a tombstone, and the next record is - * a tombstone that matches the current one, then the current one - * has been deleted, and both it and its tombstone can be skipped - * over. - */ - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - /* skip over records that have been deleted via tagging */ - if (!cursor.ptr->is_deleted()) { - keys.emplace_back(cursor.ptr->rec.key); - m_data[info.record_count++] = *cursor.ptr; - - /* - * if the record is a tombstone, increment the ts count and - * insert it into the bloom filter if one has been - * provided. - */ - if (cursor.ptr->is_tombstone()) { - info.tombstone_count++; - if (m_bf) { - m_bf->insert(cursor.ptr->rec); - } - } - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); - } - } + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; - m_reccnt = info.record_count; - m_tombstone_cnt = info.tombstone_count; + if (m_reccnt > 0) { + m_pgm = pgm::PGMIndex<K, epsilon>(keys); + } + } + + PGM(std::vector<const PGM *> const &shards) + : m_data(nullptr), m_bf(nullptr), m_reccnt(0), m_tombstone_cnt(0), + m_alloc_size(0) { + + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = + build_cursor_vec<R, PGM>(shards, &attemp_reccnt, &tombstone_count); + + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **)&m_data); + std::vector<K> keys; + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i = 0; i < cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } - if (m_reccnt > 0) { - m_pgm = pgm::PGMIndex<K, epsilon>(keys); + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = + pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[now.version]; + auto &cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + keys.emplace_back(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); + } + } } - } + pq.pop(); - ~PGM() { - free(m_data); - delete m_bf; + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); + } } - Wrapped<R> *point_lookup(const R &rec, bool filter=false) const { - size_t idx = get_lower_bound(rec.key); - if (idx >= m_reccnt) { - return nullptr; - } + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; - while (idx < m_reccnt && m_data[idx].rec < rec) ++idx; + if (m_reccnt > 0) { + m_pgm = pgm::PGMIndex<K, epsilon>(keys); + } + } - if (m_data[idx].rec == rec) { - return m_data + idx; - } + ~PGM() { + free(m_data); + delete m_bf; + } - return nullptr; + Wrapped<R> *point_lookup(const R &rec, bool filter = false) const { + size_t idx = get_lower_bound(rec.key); + if (idx >= m_reccnt) { + return nullptr; } - Wrapped<R>* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; - } + while (idx < m_reccnt && m_data[idx].rec < rec) + ++idx; - size_t get_tombstone_count() const { - return m_tombstone_cnt; + if (m_data[idx].rec == rec) { + return m_data + idx; } - const Wrapped<R>* get_record_at(size_t idx) const { - if (idx >= m_reccnt) return nullptr; - return m_data + idx; - } + return nullptr; + } + Wrapped<R> *get_data() const { return m_data; } - size_t get_memory_usage() const { - return m_pgm.size_in_bytes(); - } + size_t get_record_count() const { return m_reccnt; } - size_t get_aux_memory_usage() const { - return (m_bf) ? m_bf->memory_usage() : 0; - } + size_t get_tombstone_count() const { return m_tombstone_cnt; } - size_t get_lower_bound(const K& key) const { - auto bound = m_pgm.search(key); - size_t idx = bound.lo; + const Wrapped<R> *get_record_at(size_t idx) const { + if (idx >= m_reccnt) + return nullptr; + return m_data + idx; + } - if (idx >= m_reccnt) { - return m_reccnt; - } + size_t get_memory_usage() const { return m_pgm.size_in_bytes(); } - /* - * If the region to search is less than some pre-specified - * amount, perform a linear scan to locate the record. - */ - if (bound.hi - bound.lo < 256) { - while (idx < bound.hi && m_data[idx].rec.key < key) { - idx++; - } - } else { - /* Otherwise, perform a binary search */ - idx = bound.lo; - size_t max = bound.hi; - - while (idx < max) { - size_t mid = (idx + max) / 2; - if (key > m_data[mid].rec.key) { - idx = mid + 1; - } else { - max = mid; - } - } + size_t get_aux_memory_usage() const { + return (m_bf) ? m_bf->memory_usage() : 0; + } - } + size_t get_lower_bound(const K &key) const { + auto bound = m_pgm.search(key); + size_t idx = bound.lo; - /* - * the upper bound returned by PGM is one passed the end of the - * array. If we are at that point, we should just return "not found" - */ - if (idx == m_reccnt) { - return idx; - } + if (idx >= m_reccnt) { + return m_reccnt; + } - /* - * We may have walked one passed the actual lower bound, so check - * the index before the current one to see if it is the actual bound - */ - if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) { - return idx-1; + /* + * If the region to search is less than some pre-specified + * amount, perform a linear scan to locate the record. + */ + if (bound.hi - bound.lo < 256) { + while (idx < bound.hi && m_data[idx].rec.key < key) { + idx++; + } + } else { + /* Otherwise, perform a binary search */ + idx = bound.lo; + size_t max = bound.hi; + + while (idx < max) { + size_t mid = (idx + max) / 2; + if (key > m_data[mid].rec.key) { + idx = mid + 1; + } else { + max = mid; } + } + } + + /* + * the upper bound returned by PGM is one passed the end of the + * array. If we are at that point, we should just return "not found" + */ + if (idx == m_reccnt) { + return idx; + } - /* - * Otherwise, check idx. If it is a valid bound, then return it, - * otherwise return "not found". - */ - return (m_data[idx].rec.key >= key) ? idx : m_reccnt; + /* + * We may have walked one passed the actual lower bound, so check + * the index before the current one to see if it is the actual bound + */ + if (m_data[idx].rec.key > key && idx > 0 && + m_data[idx - 1].rec.key <= key) { + return idx - 1; } + /* + * Otherwise, check idx. If it is a valid bound, then return it, + * otherwise return "not found". + */ + return (m_data[idx].rec.key >= key) ? idx : m_reccnt; + } + private: - Wrapped<R>* m_data; - BloomFilter<R> *m_bf; - size_t m_reccnt; - size_t m_tombstone_cnt; - size_t m_alloc_size; - K m_max_key; - K m_min_key; - pgm::PGMIndex<K, epsilon> m_pgm; + Wrapped<R> *m_data; + BloomFilter<R> *m_bf; + size_t m_reccnt; + size_t m_tombstone_cnt; + size_t m_alloc_size; + K m_max_key; + K m_min_key; + pgm::PGMIndex<K, epsilon> m_pgm; }; -} +} // namespace de diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 1a04afc..2cd514d 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -11,319 +11,306 @@ */ #pragma once - #include <vector> #include "framework/ShardRequirements.h" -#include "ts/builder.h" #include "psu-ds/BloomFilter.h" -#include "util/bf_config.h" +#include "ts/builder.h" #include "util/SortedMerge.h" +#include "util/bf_config.h" -using psudb::CACHELINE_SIZE; using psudb::BloomFilter; +using psudb::byte; +using psudb::CACHELINE_SIZE; using psudb::PriorityQueue; using psudb::queue_record; -using psudb::byte; namespace de { -template <KVPInterface R, size_t E=1024> -class TrieSpline { +template <KVPInterface R, size_t E = 1024> class TrieSpline { public: - typedef R RECORD; + typedef R RECORD; + private: - typedef decltype(R::key) K; - typedef decltype(R::value) V; + typedef decltype(R::key) K; + typedef decltype(R::value) V; public: - TrieSpline(BufferView<R> buffer) - : m_reccnt(0) - , m_tombstone_cnt(0) - , m_alloc_size(0) - , m_max_key(0) - , m_min_key(0) - , m_bf(nullptr) - { - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - buffer.get_record_count() * - sizeof(Wrapped<R>), - (byte**) &m_data); + TrieSpline(BufferView<R> buffer) + : m_reccnt(0), m_tombstone_cnt(0), m_alloc_size(0), m_max_key(0), + m_min_key(0), m_bf(nullptr) { + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), + (byte **)&m_data); + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped<R> *)psudb::sf_aligned_calloc( + CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>)); + buffer.copy_to_buffer((byte *)temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + auto tmp_min_key = temp_buffer[0].rec.key; + auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key; + auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E); + + merge_info info = {0, 0}; + + m_min_key = tmp_min_key; + m_max_key = tmp_max_key; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) && + base->rec == (base + 1)->rec && (base + 1)->is_tombstone() && + base->rec.key != m_max_key && base->rec.key != m_min_key) { + base += 2; + continue; + } else if (base->is_deleted() && base->rec.key != m_max_key && + base->rec.key != m_min_key) { + base += 1; + continue; + } + + base->header &= 3; + bldr.AddKey(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(base->rec); + } + } - /* - * Copy the contents of the buffer view into a temporary buffer, and - * sort them. We still need to iterate over these temporary records to - * apply tombstone/deleted record filtering, as well as any possible - * per-record processing that is required by the shard being built. - */ - auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, - buffer.get_record_count(), - sizeof(Wrapped<R>)); - buffer.copy_to_buffer((byte *) temp_buffer); + base++; + } - auto base = temp_buffer; - auto stop = base + buffer.get_record_count(); - std::sort(base, stop, std::less<Wrapped<R>>()); + free(temp_buffer); - auto tmp_min_key = temp_buffer[0].rec.key; - auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key; - auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E); + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; - merge_info info = {0, 0}; + if (m_reccnt > 50) { + m_ts = bldr.Finalize(); + } + } + + TrieSpline(std::vector<const TrieSpline *> const &shards) + : m_reccnt(0), m_tombstone_cnt(0), m_alloc_size(0), m_max_key(0), + m_min_key(0), m_bf(nullptr) { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec<R, TrieSpline>(shards, &attemp_reccnt, + &tombstone_count); + + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **)&m_data); + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i = 0; i < cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } - m_min_key = tmp_min_key; - m_max_key = tmp_max_key; + auto tmp_max_key = shards[0]->m_max_key; + auto tmp_min_key = shards[0]->m_min_key; - /* - * Iterate over the temporary buffer to process the records, copying - * them into buffer as needed - */ - while (base < stop) { - if (!base->is_tombstone() && (base + 1 < stop) - && base->rec == (base + 1)->rec && (base + 1)->is_tombstone() - && base->rec.key != m_max_key && base->rec.key != m_min_key) { - base += 2; - continue; - } else if (base->is_deleted() && base->rec.key != m_max_key && base->rec.key != m_min_key) { - base += 1; - continue; - } + for (size_t i = 0; i < shards.size(); i++) { + if (shards[i]->m_max_key > tmp_max_key) { + tmp_max_key = shards[i]->m_max_key; + } - base->header &= 3; - bldr.AddKey(base->rec.key); - m_data[info.record_count++] = *base; + if (shards[i]->m_min_key < tmp_min_key) { + tmp_min_key = shards[i]->m_min_key; + } + } - if (base->is_tombstone()) { - info.tombstone_count++; - if (m_bf){ - m_bf->insert(base->rec); - } + auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E); + + m_max_key = tmp_max_key; + m_min_key = tmp_min_key; + + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = + pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. Unless the tombstone would remove the maximum or + * minimum valued key, which cannot be removed at this point + * without breaking triespline + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone() && + now.data->rec.key != tmp_max_key && + now.data->rec.key != tmp_min_key) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[now.version]; + auto &cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[now.version]; + /* + * skip over records that have been deleted via tagging, + * unless they are the max or min keys, which cannot be + * removed without breaking triespline + */ + if (!cursor.ptr->is_deleted() || cursor.ptr->rec.key == tmp_max_key || + cursor.ptr->rec.key == tmp_min_key) { + bldr.AddKey(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); } - - base++; + } } + pq.pop(); - free(temp_buffer); - - m_reccnt = info.record_count; - m_tombstone_cnt = info.tombstone_count; - - if (m_reccnt > 50) { - m_ts = bldr.Finalize(); - } + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); + } } - TrieSpline(std::vector<const TrieSpline*> const &shards) - : m_reccnt(0) - , m_tombstone_cnt(0) - , m_alloc_size(0) - , m_max_key(0) - , m_min_key(0) - , m_bf(nullptr) - { - size_t attemp_reccnt = 0; - size_t tombstone_count = 0; - auto cursors = build_cursor_vec<R, TrieSpline>(shards, &attemp_reccnt, &tombstone_count); - - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - attemp_reccnt * sizeof(Wrapped<R>), - (byte **) &m_data); - - // FIXME: For smaller cursor arrays, it may be more efficient to skip - // the priority queue and just do a scan. - PriorityQueue<Wrapped<R>> pq(cursors.size()); - for (size_t i=0; i<cursors.size(); i++) { - pq.push(cursors[i].ptr, i); - } - - auto tmp_max_key = shards[0]->m_max_key; - auto tmp_min_key = shards[0]->m_min_key; - - for (size_t i=0; i<shards.size(); i++) { - if (shards[i]->m_max_key > tmp_max_key) { - tmp_max_key = shards[i]->m_max_key; - } - - if (shards[i]->m_min_key < tmp_min_key) { - tmp_min_key = shards[i]->m_min_key; - } - } + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; - auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E); - - m_max_key = tmp_max_key; - m_min_key = tmp_min_key; - - merge_info info = {0, 0}; - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; - /* - * if the current record is not a tombstone, and the next record is - * a tombstone that matches the current one, then the current one - * has been deleted, and both it and its tombstone can be skipped - * over. Unless the tombstone would remove the maximum or - * minimum valued key, which cannot be removed at this point - * without breaking triespline - */ - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone() - && now.data->rec.key != tmp_max_key && now.data->rec.key != tmp_min_key) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - /* - * skip over records that have been deleted via tagging, - * unless they are the max or min keys, which cannot be - * removed without breaking triespline - */ - if (!cursor.ptr->is_deleted() || - cursor.ptr->rec.key == tmp_max_key || cursor.ptr->rec.key == tmp_min_key) { - bldr.AddKey(cursor.ptr->rec.key); - m_data[info.record_count++] = *cursor.ptr; - - /* - * if the record is a tombstone, increment the ts count and - * insert it into the bloom filter if one has been - * provided. - */ - if (cursor.ptr->is_tombstone()) { - info.tombstone_count++; - if (m_bf) { - m_bf->insert(cursor.ptr->rec); - } - } - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); - } - } + if (m_reccnt > 50) { + m_ts = bldr.Finalize(); + } + } - m_reccnt = info.record_count; - m_tombstone_cnt = info.tombstone_count; + ~TrieSpline() { + free(m_data); + delete m_bf; + } - if (m_reccnt > 50) { - m_ts = bldr.Finalize(); - } + Wrapped<R> *point_lookup(const R &rec, bool filter = false) const { + if (filter && m_bf && !m_bf->lookup(rec)) { + return nullptr; } - ~TrieSpline() { - free(m_data); - delete m_bf; + size_t idx = get_lower_bound(rec.key); + if (idx >= m_reccnt) { + return nullptr; } - Wrapped<R> *point_lookup(const R &rec, bool filter=false) const { - if (filter && m_bf && !m_bf->lookup(rec)) { - return nullptr; - } - - size_t idx = get_lower_bound(rec.key); - if (idx >= m_reccnt) { - return nullptr; - } - - while (idx < m_reccnt && m_data[idx].rec < rec) ++idx; - - if (m_data[idx].rec == rec) { - return m_data + idx; - } + while (idx < m_reccnt && m_data[idx].rec < rec) + ++idx; - return nullptr; + if (m_data[idx].rec == rec) { + return m_data + idx; } - Wrapped<R>* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; - } + return nullptr; + } - size_t get_tombstone_count() const { - return m_tombstone_cnt; - } + Wrapped<R> *get_data() const { return m_data; } - const Wrapped<R>* get_record_at(size_t idx) const { - if (idx >= m_reccnt) return nullptr; - return m_data + idx; - } + size_t get_record_count() const { return m_reccnt; } + size_t get_tombstone_count() const { return m_tombstone_cnt; } - size_t get_memory_usage() const { - return m_ts.GetSize(); - } + const Wrapped<R> *get_record_at(size_t idx) const { + if (idx >= m_reccnt) + return nullptr; + return m_data + idx; + } - size_t get_aux_memory_usage() const { - return (m_bf) ? m_bf->memory_usage() : 0; - } + size_t get_memory_usage() const { return m_ts.GetSize(); } - size_t get_lower_bound(const K& key) const { - if (m_reccnt < 50) { - size_t bd = m_reccnt; - for (size_t i=0; i<m_reccnt; i++) { - if (m_data[i].rec.key >= key) { - bd = i; - break; - } - } + size_t get_aux_memory_usage() const { + return (m_bf) ? m_bf->memory_usage() : 0; + } - return bd; + size_t get_lower_bound(const K &key) const { + if (m_reccnt < 50) { + size_t bd = m_reccnt; + for (size_t i = 0; i < m_reccnt; i++) { + if (m_data[i].rec.key >= key) { + bd = i; + break; } + } - auto bound = m_ts.GetSearchBound(key); - size_t idx = bound.begin; + return bd; + } - if (idx >= m_reccnt) { - return m_reccnt; - } + auto bound = m_ts.GetSearchBound(key); + size_t idx = bound.begin; - // If the region to search is less than some pre-specified - // amount, perform a linear scan to locate the record. - if (bound.end - bound.begin < 256) { - while (idx < bound.end && m_data[idx].rec.key < key) { - idx++; - } - } else { - // Otherwise, perform a binary search - idx = bound.begin; - size_t max = bound.end; - - while (idx < max) { - size_t mid = (idx + max) / 2; - if (key > m_data[mid].rec.key) { - idx = mid + 1; - } else { - max = mid; - } - } - } + if (idx >= m_reccnt) { + return m_reccnt; + } - if (idx == m_reccnt) { - return m_reccnt; + // If the region to search is less than some pre-specified + // amount, perform a linear scan to locate the record. + if (bound.end - bound.begin < 256) { + while (idx < bound.end && m_data[idx].rec.key < key) { + idx++; + } + } else { + // Otherwise, perform a binary search + idx = bound.begin; + size_t max = bound.end; + + while (idx < max) { + size_t mid = (idx + max) / 2; + if (key > m_data[mid].rec.key) { + idx = mid + 1; + } else { + max = mid; } + } + } - if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) { - return idx-1; - } + if (idx == m_reccnt) { + return m_reccnt; + } - return idx; + if (m_data[idx].rec.key > key && idx > 0 && + m_data[idx - 1].rec.key <= key) { + return idx - 1; } -private: + return idx; + } - Wrapped<R>* m_data; - size_t m_reccnt; - size_t m_tombstone_cnt; - size_t m_alloc_size; - K m_max_key; - K m_min_key; - ts::TrieSpline<K> m_ts; - BloomFilter<R> *m_bf; +private: + Wrapped<R> *m_data; + size_t m_reccnt; + size_t m_tombstone_cnt; + size_t m_alloc_size; + K m_max_key; + K m_min_key; + ts::TrieSpline<K> m_ts; + BloomFilter<R> *m_bf; }; -} +} // namespace de diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h index 7130efe..33ce9b9 100644 --- a/include/shard/VPTree.h +++ b/include/shard/VPTree.h @@ -15,379 +15,376 @@ #include <vector> -#include <unordered_map> #include "framework/ShardRequirements.h" #include "psu-ds/PriorityQueue.h" +#include <unordered_map> +using psudb::byte; using psudb::CACHELINE_SIZE; using psudb::PriorityQueue; -using psudb::byte; namespace de { -template <NDRecordInterface R, size_t LEAFSZ=100, bool HMAP=false> +template <NDRecordInterface R, size_t LEAFSZ = 100, bool HMAP = false> class VPTree { public: - typedef R RECORD; + typedef R RECORD; private: - struct vpnode { - size_t start; - size_t stop; - bool leaf; + struct vpnode { + size_t start; + size_t stop; + bool leaf; + + double radius; + vpnode *inside; + vpnode *outside; + + vpnode() + : start(0), stop(0), leaf(false), radius(0.0), inside(nullptr), + outside(nullptr) {} + + ~vpnode() { + delete inside; + delete outside; + } + }; - double radius; - vpnode *inside; - vpnode *outside; +public: + VPTree(BufferView<R> buffer) + : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) { + + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), + (byte **)&m_data); + + m_ptrs = new vp_ptr[buffer.get_record_count()]; + m_reccnt = 0; + + // FIXME: will eventually need to figure out tombstones + // this one will likely require the multi-pass + // approach, as otherwise we'll need to sort the + // records repeatedly on each reconstruction. + for (size_t i = 0; i < buffer.get_record_count(); i++) { + auto rec = buffer.get(i); + + if (rec->is_deleted()) { + continue; + } + + rec->header &= 3; + m_data[m_reccnt] = *rec; + m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; + m_reccnt++; + } - vpnode() : start(0), stop(0), leaf(false), radius(0.0), inside(nullptr), outside(nullptr) {} + if (m_reccnt > 0) { + m_root = build_vptree(); + build_map(); + } + } - ~vpnode() { - delete inside; - delete outside; - } - }; + VPTree(std::vector<const VPTree *> shards) + : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) { + size_t attemp_reccnt = 0; + for (size_t i = 0; i < shards.size(); i++) { + attemp_reccnt += shards[i]->get_record_count(); + } + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **)&m_data); + m_ptrs = new vp_ptr[attemp_reccnt]; + + // FIXME: will eventually need to figure out tombstones + // this one will likely require the multi-pass + // approach, as otherwise we'll need to sort the + // records repeatedly on each reconstruction. + for (size_t i = 0; i < shards.size(); i++) { + for (size_t j = 0; j < shards[i]->get_record_count(); j++) { + if (shards[i]->get_record_at(j)->is_deleted()) { + continue; + } -public: - VPTree(BufferView<R> buffer) - : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) { + m_data[m_reccnt] = *shards[i]->get_record_at(j); + m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; + m_reccnt++; + } + } + if (m_reccnt > 0) { + m_root = build_vptree(); + build_map(); + } + } - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - buffer.get_record_count() * - sizeof(Wrapped<R>), - (byte**) &m_data); + ~VPTree() { + free(m_data); + delete m_root; + delete[] m_ptrs; + } - m_ptrs = new vp_ptr[buffer.get_record_count()]; - m_reccnt = 0; + Wrapped<R> *point_lookup(const R &rec, bool filter = false) { + if constexpr (HMAP) { + auto idx = m_lookup_map.find(rec); - // FIXME: will eventually need to figure out tombstones - // this one will likely require the multi-pass - // approach, as otherwise we'll need to sort the - // records repeatedly on each reconstruction. - for (size_t i=0; i<buffer.get_record_count(); i++) { - auto rec = buffer.get(i); + if (idx == m_lookup_map.end()) { + return nullptr; + } - if (rec->is_deleted()) { - continue; - } + return m_data + idx->second; + } else { + vpnode *node = m_root; - rec->header &= 3; - m_data[m_reccnt] = *rec; - m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; - m_reccnt++; + while (!node->leaf && m_ptrs[node->start].ptr->rec != rec) { + if (rec.calc_distance((m_ptrs[node->start].ptr->rec)) >= node->radius) { + node = node->outside; + } else { + node = node->inside; } + } - if (m_reccnt > 0) { - m_root = build_vptree(); - build_map(); + for (size_t i = node->start; i <= node->stop; i++) { + if (m_ptrs[i].ptr->rec == rec) { + return m_ptrs[i].ptr; } + } + + return nullptr; } + } - VPTree(std::vector<const VPTree*> shards) - : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) { + Wrapped<R> *get_data() const { return m_data; } - size_t attemp_reccnt = 0; - for (size_t i=0; i<shards.size(); i++) { - attemp_reccnt += shards[i]->get_record_count(); - } + size_t get_record_count() const { return m_reccnt; } - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - attemp_reccnt * sizeof(Wrapped<R>), - (byte **) &m_data); - m_ptrs = new vp_ptr[attemp_reccnt]; - - // FIXME: will eventually need to figure out tombstones - // this one will likely require the multi-pass - // approach, as otherwise we'll need to sort the - // records repeatedly on each reconstruction. - for (size_t i=0; i<shards.size(); i++) { - for (size_t j=0; j<shards[i]->get_record_count(); j++) { - if (shards[i]->get_record_at(j)->is_deleted()) { - continue; - } - - m_data[m_reccnt] = *shards[i]->get_record_at(j); - m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; - m_reccnt++; - } - } + size_t get_tombstone_count() const { return m_tombstone_cnt; } - if (m_reccnt > 0) { - m_root = build_vptree(); - build_map(); - } - } + const Wrapped<R> *get_record_at(size_t idx) const { + if (idx >= m_reccnt) + return nullptr; + return m_data + idx; + } - ~VPTree() { - free(m_data); - delete m_root; - delete[] m_ptrs; - } + size_t get_memory_usage() const { + return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R *); + } - Wrapped<R> *point_lookup(const R &rec, bool filter=false) { - if constexpr (HMAP) { - auto idx = m_lookup_map.find(rec); + size_t get_aux_memory_usage() const { + // FIXME: need to return the size of the unordered_map + return 0; + } - if (idx == m_lookup_map.end()) { - return nullptr; - } + void search(const R &point, size_t k, + PriorityQueue<Wrapped<R>, DistCmpMax<Wrapped<R>>> &pq) { + double farthest = std::numeric_limits<double>::max(); - return m_data + idx->second; - } else { - vpnode *node = m_root; - - while (!node->leaf && m_ptrs[node->start].ptr->rec != rec) { - if (rec.calc_distance((m_ptrs[node->start].ptr->rec)) >= node->radius) { - node = node->outside; - } else { - node = node->inside; - } - } - - for (size_t i=node->start; i<=node->stop; i++) { - if (m_ptrs[i].ptr->rec == rec) { - return m_ptrs[i].ptr; - } - } - - return nullptr; - } - } + internal_search(m_root, point, k, pq, &farthest); + } - Wrapped<R>* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; +private: + struct vp_ptr { + Wrapped<R> *ptr; + double dist; + }; + Wrapped<R> *m_data; + vp_ptr *m_ptrs; + std::unordered_map<R, size_t, RecordHash<R>> m_lookup_map; + size_t m_reccnt; + size_t m_tombstone_cnt; + size_t m_node_cnt; + size_t m_alloc_size; + + vpnode *m_root; + + vpnode *build_vptree() { + if (m_reccnt == 0) { + return nullptr; } - size_t get_tombstone_count() const { - return m_tombstone_cnt; - } + size_t lower = 0; + size_t upper = m_reccnt - 1; - const Wrapped<R>* get_record_at(size_t idx) const { - if (idx >= m_reccnt) return nullptr; - return m_data + idx; - } + auto rng = gsl_rng_alloc(gsl_rng_mt19937); + auto root = build_subtree(lower, upper, rng); + gsl_rng_free(rng); + return root; + } - size_t get_memory_usage() const { - return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*); + void build_map() { + // Skip constructing the hashmap if disabled in the + // template parameters. + if constexpr (!HMAP) { + return; } - size_t get_aux_memory_usage() const { - // FIXME: need to return the size of the unordered_map - return 0; + for (size_t i = 0; i < m_reccnt; i++) { + // FIXME: Will need to account for tombstones here too. Under + // tombstones, it is technically possible for two otherwise identical + // instances of the same record to exist within the same shard, so + // long as one of them is a tombstone. Because the table is currently + // using the unwrapped records for the key, it isn't possible for it + // to handle this case right now. + m_lookup_map.insert({m_data[i].rec, i}); } - - void search(const R &point, size_t k, PriorityQueue<Wrapped<R>, - DistCmpMax<Wrapped<R>>> &pq) { - double farthest = std::numeric_limits<double>::max(); - - internal_search(m_root, point, k, pq, &farthest); + } + + vpnode *build_subtree(size_t start, size_t stop, gsl_rng *rng) { + /* + * base-case: sometimes happens (probably because of the +1 and -1 + * in the first recursive call) + */ + if (start > stop) { + return nullptr; } -private: - struct vp_ptr { - Wrapped<R> *ptr; - double dist; - }; - Wrapped<R>* m_data; - vp_ptr* m_ptrs; - std::unordered_map<R, size_t, RecordHash<R>> m_lookup_map; - size_t m_reccnt; - size_t m_tombstone_cnt; - size_t m_node_cnt; - size_t m_alloc_size; - - vpnode *m_root; - - vpnode *build_vptree() { - if (m_reccnt == 0) { - return nullptr; - } - - size_t lower = 0; - size_t upper = m_reccnt - 1; + /* base-case: create a leaf node */ + if (stop - start <= LEAFSZ) { + vpnode *node = new vpnode(); + node->start = start; + node->stop = stop; + node->leaf = true; - auto rng = gsl_rng_alloc(gsl_rng_mt19937); - auto root = build_subtree(lower, upper, rng); - gsl_rng_free(rng); - return root; + m_node_cnt++; + return node; } - void build_map() { - // Skip constructing the hashmap if disabled in the - // template parameters. - if constexpr (!HMAP) { - return; - } - - for (size_t i=0; i<m_reccnt; i++) { - // FIXME: Will need to account for tombstones here too. Under - // tombstones, it is technically possible for two otherwise identical - // instances of the same record to exist within the same shard, so - // long as one of them is a tombstone. Because the table is currently - // using the unwrapped records for the key, it isn't possible for it - // to handle this case right now. - m_lookup_map.insert({m_data[i].rec, i}); - } + /* + * select a random element to be the root of the + * subtree + */ + auto i = start + gsl_rng_uniform_int(rng, stop - start + 1); + swap(start, i); + + /* for efficiency, we'll pre-calculate the distances between each point and + * the root */ + for (size_t i = start + 1; i <= stop; i++) { + m_ptrs[i].dist = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[i].ptr->rec); } - vpnode *build_subtree(size_t start, size_t stop, gsl_rng *rng) { - /* - * base-case: sometimes happens (probably because of the +1 and -1 - * in the first recursive call) - */ - if (start > stop) { - return nullptr; - } - - /* base-case: create a leaf node */ - if (stop - start <= LEAFSZ) { - vpnode *node = new vpnode(); - node->start = start; - node->stop = stop; - node->leaf = true; - - m_node_cnt++; - return node; - } - - /* - * select a random element to be the root of the - * subtree - */ - auto i = start + gsl_rng_uniform_int(rng, stop - start + 1); - swap(start, i); - - /* for efficiency, we'll pre-calculate the distances between each point and the root */ - for (size_t i=start+1; i<=stop; i++) { - m_ptrs[i].dist = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[i].ptr->rec); - } - - /* - * partition elements based on their distance from the start, - * with those elements with distance falling below the median - * distance going into the left sub-array and those above - * the median in the right. This is easily done using QuickSelect. - */ - auto mid = (start + 1 + stop) / 2; - quickselect(start + 1, stop, mid, m_ptrs[start].ptr, rng); - - /* Create a new node based on this partitioning */ - vpnode *node = new vpnode(); - node->start = start; + /* + * partition elements based on their distance from the start, + * with those elements with distance falling below the median + * distance going into the left sub-array and those above + * the median in the right. This is easily done using QuickSelect. + */ + auto mid = (start + 1 + stop) / 2; + quickselect(start + 1, stop, mid, m_ptrs[start].ptr, rng); - /* store the radius of the circle used for partitioning the node. */ - node->radius = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[mid].ptr->rec); - m_ptrs[start].dist = node->radius; + /* Create a new node based on this partitioning */ + vpnode *node = new vpnode(); + node->start = start; - /* recursively construct the left and right subtrees */ - node->inside = build_subtree(start + 1, mid-1, rng); - node->outside = build_subtree(mid, stop, rng); + /* store the radius of the circle used for partitioning the node. */ + node->radius = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[mid].ptr->rec); + m_ptrs[start].dist = node->radius; - m_node_cnt++; + /* recursively construct the left and right subtrees */ + node->inside = build_subtree(start + 1, mid - 1, rng); + node->outside = build_subtree(mid, stop, rng); - return node; - } + m_node_cnt++; - void quickselect(size_t start, size_t stop, size_t k, Wrapped<R> *p, gsl_rng *rng) { - if (start == stop) return; + return node; + } - auto pivot = partition(start, stop, p, rng); + void quickselect(size_t start, size_t stop, size_t k, Wrapped<R> *p, + gsl_rng *rng) { + if (start == stop) + return; - if (k < pivot) { - quickselect(start, pivot - 1, k, p, rng); - } else if (k > pivot) { - quickselect(pivot + 1, stop, k, p, rng); - } - } + auto pivot = partition(start, stop, p, rng); - // TODO: The quickselect code can probably be generalized and moved out - // to psudb-common instead. - size_t partition(size_t start, size_t stop, Wrapped<R> *p, gsl_rng *rng) { - auto pivot = start + gsl_rng_uniform_int(rng, stop - start); - //double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec); - - swap(pivot, stop); - - size_t j = start; - for (size_t i=start; i<stop; i++) { - if (m_ptrs[i].dist < m_ptrs[stop].dist) { - //assert(distances[i - start] == p->rec.calc_distance(m_ptrs[i]->rec)); - //if (distances[i - start] < distances[stop - start]) { - //if (p->rec .calc_distance(m_ptrs[i]->rec) < pivot_dist) { - swap(j, i); - j++; - } - } - - swap(j, stop); - return j; + if (k < pivot) { + quickselect(start, pivot - 1, k, p, rng); + } else if (k > pivot) { + quickselect(pivot + 1, stop, k, p, rng); } - - void swap(size_t idx1, size_t idx2) { - auto tmp = m_ptrs[idx1]; - m_ptrs[idx1] = m_ptrs[idx2]; - m_ptrs[idx2] = tmp; + } + + // TODO: The quickselect code can probably be generalized and moved out + // to psudb-common instead. + size_t partition(size_t start, size_t stop, Wrapped<R> *p, gsl_rng *rng) { + auto pivot = start + gsl_rng_uniform_int(rng, stop - start); + // double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec); + + swap(pivot, stop); + + size_t j = start; + for (size_t i = start; i < stop; i++) { + if (m_ptrs[i].dist < m_ptrs[stop].dist) { + // assert(distances[i - start] == p->rec.calc_distance(m_ptrs[i]->rec)); + // if (distances[i - start] < distances[stop - start]) { + // if (p->rec .calc_distance(m_ptrs[i]->rec) < pivot_dist) { + swap(j, i); + j++; + } } - void internal_search(vpnode *node, const R &point, size_t k, PriorityQueue<Wrapped<R>, - DistCmpMax<Wrapped<R>>> &pq, double *farthest) { - - if (node == nullptr) return; + swap(j, stop); + return j; + } - if (node->leaf) { - for (size_t i=node->start; i<=node->stop; i++) { - double d = point.calc_distance(m_ptrs[i].ptr->rec); - if (d < *farthest) { - if (pq.size() == k) { - pq.pop(); - } + void swap(size_t idx1, size_t idx2) { + auto tmp = m_ptrs[idx1]; + m_ptrs[idx1] = m_ptrs[idx2]; + m_ptrs[idx2] = tmp; + } - pq.push(m_ptrs[i].ptr); - if (pq.size() == k) { - *farthest = point.calc_distance(pq.peek().data->rec); - } - } - } + void internal_search(vpnode *node, const R &point, size_t k, + PriorityQueue<Wrapped<R>, DistCmpMax<Wrapped<R>>> &pq, + double *farthest) { - return; - } - - double d = point.calc_distance(m_ptrs[node->start].ptr->rec); + if (node == nullptr) + return; + if (node->leaf) { + for (size_t i = node->start; i <= node->stop; i++) { + double d = point.calc_distance(m_ptrs[i].ptr->rec); if (d < *farthest) { - if (pq.size() == k) { - pq.pop(); - } - pq.push(m_ptrs[node->start].ptr); - if (pq.size() == k) { - *farthest = point.calc_distance(pq.peek().data->rec); - } + if (pq.size() == k) { + pq.pop(); + } + + pq.push(m_ptrs[i].ptr); + if (pq.size() == k) { + *farthest = point.calc_distance(pq.peek().data->rec); + } } + } - if (d < node->radius) { - if (d - (*farthest) <= node->radius) { - internal_search(node->inside, point, k, pq, farthest); - } + return; + } - if (d + (*farthest) >= node->radius) { - internal_search(node->outside, point, k, pq, farthest); - } - } else { - if (d + (*farthest) >= node->radius) { - internal_search(node->outside, point, k, pq, farthest); - } + double d = point.calc_distance(m_ptrs[node->start].ptr->rec); - if (d - (*farthest) <= node->radius) { - internal_search(node->inside, point, k, pq, farthest); - } - } + if (d < *farthest) { + if (pq.size() == k) { + pq.pop(); + } + pq.push(m_ptrs[node->start].ptr); + if (pq.size() == k) { + *farthest = point.calc_distance(pq.peek().data->rec); + } + } + + if (d < node->radius) { + if (d - (*farthest) <= node->radius) { + internal_search(node->inside, point, k, pq, farthest); + } + + if (d + (*farthest) >= node->radius) { + internal_search(node->outside, point, k, pq, farthest); + } + } else { + if (d + (*farthest) >= node->radius) { + internal_search(node->outside, point, k, pq, farthest); + } + + if (d - (*farthest) <= node->radius) { + internal_search(node->inside, point, k, pq, farthest); + } } - }; -} + } +}; +} // namespace de |