summaryrefslogtreecommitdiffstats
path: root/include/shard
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-09-25 14:42:44 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2025-09-25 14:42:44 -0400
commitcf5f3bbb0cb58430ed68ad3ebfcefc009e553d71 (patch)
tree4c17bc3169ee195c236cea9c9efda0aef7488e3c /include/shard
parent826c1fff5accbaa6b415acc176a5acbeb5f691b6 (diff)
downloaddynamic-extension-cf5f3bbb0cb58430ed68ad3ebfcefc009e553d71.tar.gz
Code reformatting
Diffstat (limited to 'include/shard')
-rw-r--r--include/shard/Alias.h259
-rw-r--r--include/shard/FSTrie.h282
-rw-r--r--include/shard/ISAMTree.h4
-rw-r--r--include/shard/LoudsPatricia.h278
-rw-r--r--include/shard/PGM.h451
-rw-r--r--include/shard/TrieSpline.h503
-rw-r--r--include/shard/VPTree.h605
7 files changed, 1155 insertions, 1227 deletions
diff --git a/include/shard/Alias.h b/include/shard/Alias.h
index 15b0884..c176fa2 100644
--- a/include/shard/Alias.h
+++ b/include/shard/Alias.h
@@ -20,186 +20,163 @@
#include "psu-ds/Alias.h"
#include "psu-ds/BloomFilter.h"
-#include "util/bf_config.h"
#include "util/SortedMerge.h"
+#include "util/bf_config.h"
-using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
using psudb::byte;
+using psudb::CACHELINE_SIZE;
namespace de {
-template <WeightedRecordInterface R>
-class Alias {
+template <WeightedRecordInterface R> class Alias {
public:
- typedef R RECORD;
-private:
- typedef decltype(R::key) K;
- typedef decltype(R::value) V;
- typedef decltype(R::weight) W;
+ typedef R RECORD;
+private:
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+ typedef decltype(R::weight) W;
public:
- Alias(BufferView<R> buffer)
- : m_data(nullptr)
- , m_alias(nullptr)
- , m_total_weight(0)
- , m_reccnt(0)
- , m_tombstone_cnt(0)
- , m_alloc_size(0)
- , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) {
-
-
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- buffer.get_record_count() *
- sizeof(Wrapped<R>),
- (byte**) &m_data);
-
- auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf);
- m_reccnt = res.record_count;
- m_tombstone_cnt = res.tombstone_count;
-
- if (m_reccnt > 0) {
- std::vector<W> weights;
- for (size_t i=0; i<m_reccnt; i++) {
- weights.emplace_back(m_data[i].rec.weight);
- m_total_weight += m_data[i].rec.weight;
- }
-
- build_alias_structure(weights);
- }
+ Alias(BufferView<R> buffer)
+ : m_data(nullptr), m_alias(nullptr), m_total_weight(0), m_reccnt(0),
+ m_tombstone_cnt(0), m_alloc_size(0),
+ m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(),
+ BF_HASH_FUNCS)) {
+
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>),
+ (byte **)&m_data);
+
+ auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
+
+ if (m_reccnt > 0) {
+ std::vector<W> weights;
+ for (size_t i = 0; i < m_reccnt; i++) {
+ weights.emplace_back(m_data[i].rec.weight);
+ m_total_weight += m_data[i].rec.weight;
+ }
+
+ build_alias_structure(weights);
}
+ }
- Alias(std::vector<const Alias*> const &shards)
- : m_data(nullptr)
- , m_alias(nullptr)
- , m_total_weight(0)
- , m_reccnt(0)
- , m_tombstone_cnt(0)
- , m_alloc_size(0)
- , m_bf(nullptr) {
-
- size_t attemp_reccnt = 0;
- size_t tombstone_count = 0;
- auto cursors = build_cursor_vec<R, Alias>(shards, &attemp_reccnt, &tombstone_count);
-
- m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- attemp_reccnt * sizeof(Wrapped<R>),
- (byte **) &m_data);
-
- auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
- m_reccnt = res.record_count;
- m_tombstone_cnt = res.tombstone_count;
-
- if (m_reccnt > 0) {
- std::vector<W> weights;
- for (size_t i=0; i<m_reccnt; i++) {
- weights.emplace_back(m_data[i].rec.weight);
- m_total_weight += m_data[i].rec.weight;
- }
-
- build_alias_structure(weights);
- }
- }
-
- ~Alias() {
- free(m_data);
- delete m_alias;
- delete m_bf;
- }
+ Alias(std::vector<const Alias *> const &shards)
+ : m_data(nullptr), m_alias(nullptr), m_total_weight(0), m_reccnt(0),
+ m_tombstone_cnt(0), m_alloc_size(0), m_bf(nullptr) {
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
- if (filter && !m_bf->lookup(rec)) {
- return nullptr;
- }
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors =
+ build_cursor_vec<R, Alias>(shards, &attemp_reccnt, &tombstone_count);
- size_t idx = get_lower_bound(rec.key);
- if (idx >= m_reccnt) {
- return nullptr;
- }
+ m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **)&m_data);
- while (idx < (m_reccnt-1) && m_data[idx].rec < rec) ++idx;
+ auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
- if (m_data[idx].rec == rec) {
- return m_data + idx;
- }
+ if (m_reccnt > 0) {
+ std::vector<W> weights;
+ for (size_t i = 0; i < m_reccnt; i++) {
+ weights.emplace_back(m_data[i].rec.weight);
+ m_total_weight += m_data[i].rec.weight;
+ }
- return nullptr;
+ build_alias_structure(weights);
}
+ }
- Wrapped<R>* get_data() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
- }
+ ~Alias() {
+ free(m_data);
+ delete m_alias;
+ delete m_bf;
+ }
- size_t get_tombstone_count() const {
- return m_tombstone_cnt;
+ Wrapped<R> *point_lookup(const R &rec, bool filter = false) {
+ if (filter && !m_bf->lookup(rec)) {
+ return nullptr;
}
- const Wrapped<R>* get_record_at(size_t idx) const {
- if (idx >= m_reccnt) return nullptr;
- return m_data + idx;
+ size_t idx = get_lower_bound(rec.key);
+ if (idx >= m_reccnt) {
+ return nullptr;
}
+ while (idx < (m_reccnt - 1) && m_data[idx].rec < rec)
+ ++idx;
- size_t get_memory_usage() const {
- return 0;
+ if (m_data[idx].rec == rec) {
+ return m_data + idx;
}
- size_t get_aux_memory_usage() const {
- return (m_bf) ? m_bf->memory_usage() : 0;
- }
+ return nullptr;
+ }
- W get_total_weight() const {
- return m_total_weight;
- }
+ Wrapped<R> *get_data() const { return m_data; }
- size_t get_weighted_sample(gsl_rng *rng) const {
- return m_alias->get(rng);
- }
+ size_t get_record_count() const { return m_reccnt; }
- size_t get_lower_bound(const K& key) const {
- size_t min = 0;
- size_t max = m_reccnt - 1;
+ size_t get_tombstone_count() const { return m_tombstone_cnt; }
- while (min < max) {
- size_t mid = (min + max) / 2;
+ const Wrapped<R> *get_record_at(size_t idx) const {
+ if (idx >= m_reccnt)
+ return nullptr;
+ return m_data + idx;
+ }
- if (key > m_data[mid].rec.key) {
- min = mid + 1;
- } else {
- max = mid;
- }
- }
+ size_t get_memory_usage() const { return 0; }
- return min;
- }
+ size_t get_aux_memory_usage() const {
+ return (m_bf) ? m_bf->memory_usage() : 0;
+ }
-private:
+ W get_total_weight() const { return m_total_weight; }
+
+ size_t get_weighted_sample(gsl_rng *rng) const { return m_alias->get(rng); }
- void build_alias_structure(std::vector<W> &weights) {
+ size_t get_lower_bound(const K &key) const {
+ size_t min = 0;
+ size_t max = m_reccnt - 1;
- // normalize the weights vector
- std::vector<double> norm_weights(weights.size());
+ while (min < max) {
+ size_t mid = (min + max) / 2;
+
+ if (key > m_data[mid].rec.key) {
+ min = mid + 1;
+ } else {
+ max = mid;
+ }
+ }
+
+ return min;
+ }
+
+private:
+ void build_alias_structure(std::vector<W> &weights) {
- for (size_t i=0; i<weights.size(); i++) {
- norm_weights[i] = (double) weights[i] / (double) m_total_weight;
- }
+ // normalize the weights vector
+ std::vector<double> norm_weights(weights.size());
- // build the alias structure
- m_alias = new psudb::Alias(norm_weights);
+ for (size_t i = 0; i < weights.size(); i++) {
+ norm_weights[i] = (double)weights[i] / (double)m_total_weight;
}
- Wrapped<R>* m_data;
- psudb::Alias *m_alias;
- W m_total_weight;
- size_t m_reccnt;
- size_t m_tombstone_cnt;
- size_t m_alloc_size;
- BloomFilter<R> *m_bf;
+ // build the alias structure
+ m_alias = new psudb::Alias(norm_weights);
+ }
+
+ Wrapped<R> *m_data;
+ psudb::Alias *m_alias;
+ W m_total_weight;
+ size_t m_reccnt;
+ size_t m_tombstone_cnt;
+ size_t m_alloc_size;
+ BloomFilter<R> *m_bf;
};
-}
+} // namespace de
diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h
index 59ff116..31db40b 100644
--- a/include/shard/FSTrie.h
+++ b/include/shard/FSTrie.h
@@ -9,194 +9,182 @@
*/
#pragma once
-
#include <vector>
#include "framework/ShardRequirements.h"
#include "fst.hpp"
#include "util/SortedMerge.h"
-using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
+using psudb::byte;
+using psudb::CACHELINE_SIZE;
using psudb::PriorityQueue;
using psudb::queue_record;
-using psudb::byte;
namespace de {
-template <KVPInterface R>
-class FSTrie {
+template <KVPInterface R> class FSTrie {
public:
- typedef R RECORD;
-private:
+ typedef R RECORD;
- typedef decltype(R::key) K;
- typedef decltype(R::value) V;
- static_assert(std::is_same_v<K, const char*>, "FST requires const char* keys.");
+private:
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+ static_assert(std::is_same_v<K, const char *>,
+ "FST requires const char* keys.");
public:
- FSTrie(BufferView<R> buffer)
- : m_data(nullptr)
- , m_reccnt(0)
- , m_alloc_size(0)
- {
- m_data = new Wrapped<R>[buffer.get_record_count()]();
- m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count();
-
- size_t cnt = 0;
- std::vector<std::string> keys;
- keys.reserve(buffer.get_record_count());
-
- /*
- * Copy the contents of the buffer view into a temporary buffer, and
- * sort them. We still need to iterate over these temporary records to
- * apply tombstone/deleted record filtering, as well as any possible
- * per-record processing that is required by the shard being built.
- */
- auto temp_buffer = new Wrapped<R>[buffer.get_record_count()]();
- for (size_t i=0; i<buffer.get_record_count(); i++) {
- temp_buffer[i] = *(buffer.get(i));
- }
-
- auto base = temp_buffer;
- auto stop = base + buffer.get_record_count();
- std::sort(base, stop, std::less<Wrapped<R>>());
+ FSTrie(BufferView<R> buffer) : m_data(nullptr), m_reccnt(0), m_alloc_size(0) {
+ m_data = new Wrapped<R>[buffer.get_record_count()]();
+ m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count();
+
+ size_t cnt = 0;
+ std::vector<std::string> keys;
+ keys.reserve(buffer.get_record_count());
+
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = new Wrapped<R>[buffer.get_record_count()]();
+ for (size_t i = 0; i < buffer.get_record_count(); i++) {
+ temp_buffer[i] = *(buffer.get(i));
+ }
- for (size_t i=0; i<buffer.get_record_count(); i++) {
- if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || temp_buffer[i].rec.key[0] != '\0') {
- continue;
- }
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
- m_data[cnt] = temp_buffer[i];
- m_data[cnt].clear_timestamp();
+ for (size_t i = 0; i < buffer.get_record_count(); i++) {
+ if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() ||
+ temp_buffer[i].rec.key[0] != '\0') {
+ continue;
+ }
- keys.push_back(std::string(m_data[cnt].rec.key));
- cnt++;
- }
+ m_data[cnt] = temp_buffer[i];
+ m_data[cnt].clear_timestamp();
- m_reccnt = cnt;
- if (m_reccnt > 0) {
- m_fst = new fst::Trie(keys, true, 1);
- }
+ keys.push_back(std::string(m_data[cnt].rec.key));
+ cnt++;
+ }
- delete[] temp_buffer;
+ m_reccnt = cnt;
+ if (m_reccnt > 0) {
+ m_fst = new fst::Trie(keys, true, 1);
}
- FSTrie(std::vector<const FSTrie*> const &shards)
- : m_data(nullptr)
- , m_reccnt(0)
- , m_alloc_size(0)
- {
- size_t attemp_reccnt = 0;
- size_t tombstone_count = 0;
- auto cursors = build_cursor_vec<R, FSTrie>(shards, &attemp_reccnt, &tombstone_count);
-
- m_data = new Wrapped<R>[attemp_reccnt]();
- m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>);
-
- std::vector<std::string> keys;
- keys.reserve(attemp_reccnt);
-
- // FIXME: For smaller cursor arrays, it may be more efficient to skip
- // the priority queue and just do a scan.
- PriorityQueue<Wrapped<R>> pq(cursors.size());
- for (size_t i=0; i<cursors.size(); i++) {
- pq.push(cursors[i].ptr, i);
- }
+ delete[] temp_buffer;
+ }
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- /*
- * if the current record is not a tombstone, and the next record is
- * a tombstone that matches the current one, then the current one
- * has been deleted, and both it and its tombstone can be skipped
- * over.
- */
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- /* skip over records that have been deleted via tagging */
- if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key[0] != '\0') {
- m_data[m_reccnt] = *cursor.ptr;
- keys.push_back(std::string(m_data[m_reccnt].rec.key));
-
- m_reccnt++;
- }
- pq.pop();
-
- if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ FSTrie(std::vector<const FSTrie *> const &shards)
+ : m_data(nullptr), m_reccnt(0), m_alloc_size(0) {
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors =
+ build_cursor_vec<R, FSTrie>(shards, &attemp_reccnt, &tombstone_count);
- if (m_reccnt > 0) {
- m_fst = new fst::Trie(keys, true, 1);
- }
+ m_data = new Wrapped<R>[attemp_reccnt]();
+ m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>);
+
+ std::vector<std::string> keys;
+ keys.reserve(attemp_reccnt);
+
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i = 0; i < cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
}
- ~FSTrie() {
- delete[] m_data;
- delete m_fst;
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next =
+ pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop();
+ pq.pop();
+ auto &cursor1 = cursors[now.version];
+ auto &cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1))
+ pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2))
+ pq.push(cursor2.ptr, next.version);
+ } else {
+ auto &cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key[0] != '\0') {
+ m_data[m_reccnt] = *cursor.ptr;
+ keys.push_back(std::string(m_data[m_reccnt].rec.key));
+
+ m_reccnt++;
+ }
+ pq.pop();
+
+ if (advance_cursor(cursor))
+ pq.push(cursor.ptr, now.version);
+ }
}
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
+ if (m_reccnt > 0) {
+ m_fst = new fst::Trie(keys, true, 1);
+ }
+ }
- auto idx = m_fst->exactSearch(rec.key);
+ ~FSTrie() {
+ delete[] m_data;
+ delete m_fst;
+ }
- if (idx == fst::kNotFound) {
- return nullptr;
- }
+ Wrapped<R> *point_lookup(const R &rec, bool filter = false) {
- // FIXME: for convenience, I'm treating this Trie as a unique index
- // for now, so no need to scan forward and/or check values. This
- // also makes the point lookup query class a lot easier to make.
- // Ultimately, though, we can support non-unique indexes with some
- // extra work.
+ auto idx = m_fst->exactSearch(rec.key);
- return m_data + idx;
+ if (idx == fst::kNotFound) {
+ return nullptr;
}
- Wrapped<R>* get_data() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
- }
+ // FIXME: for convenience, I'm treating this Trie as a unique index
+ // for now, so no need to scan forward and/or check values. This
+ // also makes the point lookup query class a lot easier to make.
+ // Ultimately, though, we can support non-unique indexes with some
+ // extra work.
- size_t get_tombstone_count() const {
- return 0;
- }
+ return m_data + idx;
+ }
- const Wrapped<R>* get_record_at(size_t idx) const {
- if (idx >= m_reccnt) return nullptr;
- return m_data + idx;
- }
+ Wrapped<R> *get_data() const { return m_data; }
+ size_t get_record_count() const { return m_reccnt; }
- size_t get_memory_usage() const {
- return m_fst->getMemoryUsage();
- }
+ size_t get_tombstone_count() const { return 0; }
- size_t get_aux_memory_usage() const {
- return m_alloc_size;
- }
+ const Wrapped<R> *get_record_at(size_t idx) const {
+ if (idx >= m_reccnt)
+ return nullptr;
+ return m_data + idx;
+ }
- size_t get_lower_bound(R &rec) {return 0;}
- size_t get_upper_bound(R &rec) {return 0;}
+ size_t get_memory_usage() const { return m_fst->getMemoryUsage(); }
-private:
+ size_t get_aux_memory_usage() const { return m_alloc_size; }
+
+ size_t get_lower_bound(R &rec) { return 0; }
+ size_t get_upper_bound(R &rec) { return 0; }
- Wrapped<R>* m_data;
- size_t m_reccnt;
- size_t m_alloc_size;
- fst::Trie *m_fst;
+private:
+ Wrapped<R> *m_data;
+ size_t m_reccnt;
+ size_t m_alloc_size;
+ fst::Trie *m_fst;
};
-}
+} // namespace de
diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h
index f6b525f..6722aaf 100644
--- a/include/shard/ISAMTree.h
+++ b/include/shard/ISAMTree.h
@@ -121,7 +121,9 @@ public:
size_t get_memory_usage() const { return m_internal_node_cnt * NODE_SZ; }
- size_t get_aux_memory_usage() const { return (m_bf) ? m_bf->memory_usage() : 0; }
+ size_t get_aux_memory_usage() const {
+ return (m_bf) ? m_bf->memory_usage() : 0;
+ }
/* SortedShardInterface methods */
size_t get_lower_bound(const K &key) const {
diff --git a/include/shard/LoudsPatricia.h b/include/shard/LoudsPatricia.h
index fe0c30e..0b7c74c 100644
--- a/include/shard/LoudsPatricia.h
+++ b/include/shard/LoudsPatricia.h
@@ -9,191 +9,179 @@
*/
#pragma once
-
#include <vector>
#include "framework/ShardRequirements.h"
#include "louds-patricia.hpp"
#include "util/SortedMerge.h"
-using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
+using psudb::byte;
+using psudb::CACHELINE_SIZE;
using psudb::PriorityQueue;
using psudb::queue_record;
-using psudb::byte;
namespace de {
-template <KVPInterface R>
-class LoudsPatricia {
+template <KVPInterface R> class LoudsPatricia {
private:
-
- typedef decltype(R::key) K;
- typedef decltype(R::value) V;
- static_assert(std::is_same_v<K, const char*>, "FST requires const char* keys.");
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+ static_assert(std::is_same_v<K, const char *>,
+ "FST requires const char* keys.");
public:
- LoudsPatricia(BufferView<R> buffer)
- : m_data(nullptr)
- , m_reccnt(0)
- , m_alloc_size(0)
- {
- m_data = new Wrapped<R>[buffer.get_record_count()]();
- m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count();
-
- m_louds = new louds::Patricia();
-
- size_t cnt = 0;
- std::vector<std::string> keys;
- keys.reserve(buffer.get_record_count());
-
- /*
- * Copy the contents of the buffer view into a temporary buffer, and
- * sort them. We still need to iterate over these temporary records to
- * apply tombstone/deleted record filtering, as well as any possible
- * per-record processing that is required by the shard being built.
- */
- auto temp_buffer = new Wrapped<R>[buffer.get_record_count()]();
- for (size_t i=0; i<buffer.get_record_count(); i++) {
- temp_buffer[i] = *(buffer.get(i));
- }
-
- auto base = temp_buffer;
- auto stop = base + buffer.get_record_count();
- std::sort(base, stop, std::less<Wrapped<R>>());
+ LoudsPatricia(BufferView<R> buffer)
+ : m_data(nullptr), m_reccnt(0), m_alloc_size(0) {
+ m_data = new Wrapped<R>[buffer.get_record_count()]();
+ m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count();
+
+ m_louds = new louds::Patricia();
+
+ size_t cnt = 0;
+ std::vector<std::string> keys;
+ keys.reserve(buffer.get_record_count());
+
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = new Wrapped<R>[buffer.get_record_count()]();
+ for (size_t i = 0; i < buffer.get_record_count(); i++) {
+ temp_buffer[i] = *(buffer.get(i));
+ }
- for (size_t i=0; i<buffer.get_record_count(); i++) {
- if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || temp_buffer[i].rec.key == "") {
- continue;
- }
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
- m_data[cnt] = temp_buffer[i];
- m_data[cnt].clear_timestamp();
+ for (size_t i = 0; i < buffer.get_record_count(); i++) {
+ if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() ||
+ temp_buffer[i].rec.key == "") {
+ continue;
+ }
- m_louds->add(std::string(m_data[cnt].rec.key));
- cnt++;
- }
+ m_data[cnt] = temp_buffer[i];
+ m_data[cnt].clear_timestamp();
- m_reccnt = cnt;
- if (m_reccnt > 0) {
- m_louds->build();
- }
+ m_louds->add(std::string(m_data[cnt].rec.key));
+ cnt++;
+ }
- delete[] temp_buffer;
+ m_reccnt = cnt;
+ if (m_reccnt > 0) {
+ m_louds->build();
}
- LoudsPatricia(std::vector<const LoudsPatricia*> &shards)
- : m_data(nullptr)
- , m_reccnt(0)
- , m_alloc_size(0)
- {
- size_t attemp_reccnt = 0;
- size_t tombstone_count = 0;
- auto cursors = build_cursor_vec<R, LoudsPatricia>(shards, &attemp_reccnt, &tombstone_count);
-
- m_data = new Wrapped<R>[attemp_reccnt]();
- m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>);
-
- m_louds = new louds::Patricia();
-
- // FIXME: For smaller cursor arrays, it may be more efficient to skip
- // the priority queue and just do a scan.
- PriorityQueue<Wrapped<R>> pq(cursors.size());
- for (size_t i=0; i<cursors.size(); i++) {
- pq.push(cursors[i].ptr, i);
- }
+ delete[] temp_buffer;
+ }
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- /*
- * if the current record is not a tombstone, and the next record is
- * a tombstone that matches the current one, then the current one
- * has been deleted, and both it and its tombstone can be skipped
- * over.
- */
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- /* skip over records that have been deleted via tagging */
- if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") {
- m_data[m_reccnt] = *cursor.ptr;
- m_louds->add(std::string(m_data[m_reccnt].rec.key));
- m_reccnt++;
- }
- pq.pop();
-
- if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ LoudsPatricia(std::vector<const LoudsPatricia *> &shards)
+ : m_data(nullptr), m_reccnt(0), m_alloc_size(0) {
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors = build_cursor_vec<R, LoudsPatricia>(shards, &attemp_reccnt,
+ &tombstone_count);
- if (m_reccnt > 0) {
- m_louds->build();
+ m_data = new Wrapped<R>[attemp_reccnt]();
+ m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>);
+
+ m_louds = new louds::Patricia();
+
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i = 0; i < cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
+
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next =
+ pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop();
+ pq.pop();
+ auto &cursor1 = cursors[now.version];
+ auto &cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1))
+ pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2))
+ pq.push(cursor2.ptr, next.version);
+ } else {
+ auto &cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") {
+ m_data[m_reccnt] = *cursor.ptr;
+ m_louds->add(std::string(m_data[m_reccnt].rec.key));
+ m_reccnt++;
}
+ pq.pop();
+
+ if (advance_cursor(cursor))
+ pq.push(cursor.ptr, now.version);
+ }
}
- ~LoudsPatricia() {
- delete[] m_data;
- delete m_louds;
+ if (m_reccnt > 0) {
+ m_louds->build();
}
+ }
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
+ ~LoudsPatricia() {
+ delete[] m_data;
+ delete m_louds;
+ }
- auto idx = m_louds->lookup(std::string(rec.key));
+ Wrapped<R> *point_lookup(const R &rec, bool filter = false) {
- if (idx == -1) {
- return nullptr;
- }
+ auto idx = m_louds->lookup(std::string(rec.key));
- // FIXME: for convenience, I'm treating this Trie as a unique index
- // for now, so no need to scan forward and/or check values. This
- // also makes the point lookup query class a lot easier to make.
- // Ultimately, though, we can support non-unique indexes with some
- // extra work.
- return m_data + idx;
+ if (idx == -1) {
+ return nullptr;
}
- Wrapped<R>* get_data() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
- }
+ // FIXME: for convenience, I'm treating this Trie as a unique index
+ // for now, so no need to scan forward and/or check values. This
+ // also makes the point lookup query class a lot easier to make.
+ // Ultimately, though, we can support non-unique indexes with some
+ // extra work.
+ return m_data + idx;
+ }
- size_t get_tombstone_count() const {
- return 0;
- }
+ Wrapped<R> *get_data() const { return m_data; }
- const Wrapped<R>* get_record_at(size_t idx) const {
- if (idx >= m_reccnt) return nullptr;
- return m_data + idx;
- }
+ size_t get_record_count() const { return m_reccnt; }
+ size_t get_tombstone_count() const { return 0; }
- size_t get_memory_usage() const {
- return m_louds->size();
- }
+ const Wrapped<R> *get_record_at(size_t idx) const {
+ if (idx >= m_reccnt)
+ return nullptr;
+ return m_data + idx;
+ }
- size_t get_aux_memory_usage() const {
- return m_alloc_size;
- }
+ size_t get_memory_usage() const { return m_louds->size(); }
- size_t get_lower_bound(R &rec) {return 0;}
- size_t get_upper_bound(R &rec) {return 0;}
+ size_t get_aux_memory_usage() const { return m_alloc_size; }
-private:
+ size_t get_lower_bound(R &rec) { return 0; }
+ size_t get_upper_bound(R &rec) { return 0; }
- Wrapped<R>* m_data;
- size_t m_reccnt;
- size_t m_alloc_size;
- louds::Patricia *m_louds;
+private:
+ Wrapped<R> *m_data;
+ size_t m_reccnt;
+ size_t m_alloc_size;
+ louds::Patricia *m_louds;
};
-}
+} // namespace de
diff --git a/include/shard/PGM.h b/include/shard/PGM.h
index 5b39ab4..40c9141 100644
--- a/include/shard/PGM.h
+++ b/include/shard/PGM.h
@@ -13,7 +13,6 @@
*/
#pragma once
-
#include <vector>
#include "framework/ShardRequirements.h"
@@ -23,278 +22,268 @@
#include "util/SortedMerge.h"
#include "util/bf_config.h"
-using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
+using psudb::byte;
+using psudb::CACHELINE_SIZE;
using psudb::PriorityQueue;
using psudb::queue_record;
-using psudb::byte;
namespace de {
-template <RecordInterface R, size_t epsilon=128>
-class PGM {
+template <RecordInterface R, size_t epsilon = 128> class PGM {
public:
- typedef R RECORD;
+ typedef R RECORD;
+
private:
- typedef decltype(R::key) K;
- typedef decltype(R::value) V;
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
public:
- PGM(BufferView<R> buffer)
- : m_bf(nullptr)
- , m_reccnt(0)
- , m_tombstone_cnt(0)
- , m_alloc_size(0) {
-
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- buffer.get_record_count() *
- sizeof(Wrapped<R>),
- (byte**) &m_data);
-
- std::vector<K> keys;
- /*
- * Copy the contents of the buffer view into a temporary buffer, and
- * sort them. We still need to iterate over these temporary records to
- * apply tombstone/deleted record filtering, as well as any possible
- * per-record processing that is required by the shard being built.
- */
- auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE,
- buffer.get_record_count(),
- sizeof(Wrapped<R>));
- buffer.copy_to_buffer((byte *) temp_buffer);
-
- auto base = temp_buffer;
- auto stop = base + buffer.get_record_count();
- std::sort(base, stop, std::less<Wrapped<R>>());
-
- merge_info info = {0, 0};
-
- /*
- * Iterate over the temporary buffer to process the records, copying
- * them into buffer as needed
- */
- while (base < stop) {
- if (!base->is_tombstone() && (base + 1 < stop)
- && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
- base += 2;
- continue;
- } else if (base->is_deleted()) {
- base += 1;
- continue;
- }
-
- // FIXME: this shouldn't be necessary, but the tagged record
- // bypass doesn't seem to be working on this code-path, so this
- // ensures that tagged records from the buffer are able to be
- // dropped, eventually. It should only need to be &= 1
- base->header &= 3;
- keys.emplace_back(base->rec.key);
- m_data[info.record_count++] = *base;
-
- if (base->is_tombstone()) {
- info.tombstone_count++;
- if (m_bf){
- m_bf->insert(base->rec);
- }
- }
-
- base++;
+ PGM(BufferView<R> buffer)
+ : m_bf(nullptr), m_reccnt(0), m_tombstone_cnt(0), m_alloc_size(0) {
+
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>),
+ (byte **)&m_data);
+
+ std::vector<K> keys;
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = (Wrapped<R> *)psudb::sf_aligned_calloc(
+ CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>));
+ buffer.copy_to_buffer((byte *)temp_buffer);
+
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ merge_info info = {0, 0};
+
+ /*
+ * Iterate over the temporary buffer to process the records, copying
+ * them into buffer as needed
+ */
+ while (base < stop) {
+ if (!base->is_tombstone() && (base + 1 < stop) &&
+ base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
+ base += 2;
+ continue;
+ } else if (base->is_deleted()) {
+ base += 1;
+ continue;
+ }
+
+ // FIXME: this shouldn't be necessary, but the tagged record
+ // bypass doesn't seem to be working on this code-path, so this
+ // ensures that tagged records from the buffer are able to be
+ // dropped, eventually. It should only need to be &= 1
+ base->header &= 3;
+ keys.emplace_back(base->rec.key);
+ m_data[info.record_count++] = *base;
+
+ if (base->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf) {
+ m_bf->insert(base->rec);
}
+ }
- free(temp_buffer);
-
- m_reccnt = info.record_count;
- m_tombstone_cnt = info.tombstone_count;
-
- if (m_reccnt > 0) {
- m_pgm = pgm::PGMIndex<K, epsilon>(keys);
- }
+ base++;
}
- PGM(std::vector<const PGM*> const &shards)
- : m_data(nullptr)
- , m_bf(nullptr)
- , m_reccnt(0)
- , m_tombstone_cnt(0)
- , m_alloc_size(0) {
-
- size_t attemp_reccnt = 0;
- size_t tombstone_count = 0;
- auto cursors = build_cursor_vec<R, PGM>(shards, &attemp_reccnt, &tombstone_count);
-
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- attemp_reccnt * sizeof(Wrapped<R>),
- (byte **) &m_data);
- std::vector<K> keys;
-
- // FIXME: For smaller cursor arrays, it may be more efficient to skip
- // the priority queue and just do a scan.
- PriorityQueue<Wrapped<R>> pq(cursors.size());
- for (size_t i=0; i<cursors.size(); i++) {
- pq.push(cursors[i].ptr, i);
- }
+ free(temp_buffer);
- merge_info info = {0, 0};
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- /*
- * if the current record is not a tombstone, and the next record is
- * a tombstone that matches the current one, then the current one
- * has been deleted, and both it and its tombstone can be skipped
- * over.
- */
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- /* skip over records that have been deleted via tagging */
- if (!cursor.ptr->is_deleted()) {
- keys.emplace_back(cursor.ptr->rec.key);
- m_data[info.record_count++] = *cursor.ptr;
-
- /*
- * if the record is a tombstone, increment the ts count and
- * insert it into the bloom filter if one has been
- * provided.
- */
- if (cursor.ptr->is_tombstone()) {
- info.tombstone_count++;
- if (m_bf) {
- m_bf->insert(cursor.ptr->rec);
- }
- }
- }
- pq.pop();
-
- if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
- m_reccnt = info.record_count;
- m_tombstone_cnt = info.tombstone_count;
+ if (m_reccnt > 0) {
+ m_pgm = pgm::PGMIndex<K, epsilon>(keys);
+ }
+ }
+
+ PGM(std::vector<const PGM *> const &shards)
+ : m_data(nullptr), m_bf(nullptr), m_reccnt(0), m_tombstone_cnt(0),
+ m_alloc_size(0) {
+
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors =
+ build_cursor_vec<R, PGM>(shards, &attemp_reccnt, &tombstone_count);
+
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **)&m_data);
+ std::vector<K> keys;
+
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i = 0; i < cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
- if (m_reccnt > 0) {
- m_pgm = pgm::PGMIndex<K, epsilon>(keys);
+ merge_info info = {0, 0};
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next =
+ pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop();
+ pq.pop();
+ auto &cursor1 = cursors[now.version];
+ auto &cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1))
+ pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2))
+ pq.push(cursor2.ptr, next.version);
+ } else {
+ auto &cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted()) {
+ keys.emplace_back(cursor.ptr->rec.key);
+ m_data[info.record_count++] = *cursor.ptr;
+
+ /*
+ * if the record is a tombstone, increment the ts count and
+ * insert it into the bloom filter if one has been
+ * provided.
+ */
+ if (cursor.ptr->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf) {
+ m_bf->insert(cursor.ptr->rec);
+ }
+ }
}
- }
+ pq.pop();
- ~PGM() {
- free(m_data);
- delete m_bf;
+ if (advance_cursor(cursor))
+ pq.push(cursor.ptr, now.version);
+ }
}
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) const {
- size_t idx = get_lower_bound(rec.key);
- if (idx >= m_reccnt) {
- return nullptr;
- }
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
- while (idx < m_reccnt && m_data[idx].rec < rec) ++idx;
+ if (m_reccnt > 0) {
+ m_pgm = pgm::PGMIndex<K, epsilon>(keys);
+ }
+ }
- if (m_data[idx].rec == rec) {
- return m_data + idx;
- }
+ ~PGM() {
+ free(m_data);
+ delete m_bf;
+ }
- return nullptr;
+ Wrapped<R> *point_lookup(const R &rec, bool filter = false) const {
+ size_t idx = get_lower_bound(rec.key);
+ if (idx >= m_reccnt) {
+ return nullptr;
}
- Wrapped<R>* get_data() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
- }
+ while (idx < m_reccnt && m_data[idx].rec < rec)
+ ++idx;
- size_t get_tombstone_count() const {
- return m_tombstone_cnt;
+ if (m_data[idx].rec == rec) {
+ return m_data + idx;
}
- const Wrapped<R>* get_record_at(size_t idx) const {
- if (idx >= m_reccnt) return nullptr;
- return m_data + idx;
- }
+ return nullptr;
+ }
+ Wrapped<R> *get_data() const { return m_data; }
- size_t get_memory_usage() const {
- return m_pgm.size_in_bytes();
- }
+ size_t get_record_count() const { return m_reccnt; }
- size_t get_aux_memory_usage() const {
- return (m_bf) ? m_bf->memory_usage() : 0;
- }
+ size_t get_tombstone_count() const { return m_tombstone_cnt; }
- size_t get_lower_bound(const K& key) const {
- auto bound = m_pgm.search(key);
- size_t idx = bound.lo;
+ const Wrapped<R> *get_record_at(size_t idx) const {
+ if (idx >= m_reccnt)
+ return nullptr;
+ return m_data + idx;
+ }
- if (idx >= m_reccnt) {
- return m_reccnt;
- }
+ size_t get_memory_usage() const { return m_pgm.size_in_bytes(); }
- /*
- * If the region to search is less than some pre-specified
- * amount, perform a linear scan to locate the record.
- */
- if (bound.hi - bound.lo < 256) {
- while (idx < bound.hi && m_data[idx].rec.key < key) {
- idx++;
- }
- } else {
- /* Otherwise, perform a binary search */
- idx = bound.lo;
- size_t max = bound.hi;
-
- while (idx < max) {
- size_t mid = (idx + max) / 2;
- if (key > m_data[mid].rec.key) {
- idx = mid + 1;
- } else {
- max = mid;
- }
- }
+ size_t get_aux_memory_usage() const {
+ return (m_bf) ? m_bf->memory_usage() : 0;
+ }
- }
+ size_t get_lower_bound(const K &key) const {
+ auto bound = m_pgm.search(key);
+ size_t idx = bound.lo;
- /*
- * the upper bound returned by PGM is one passed the end of the
- * array. If we are at that point, we should just return "not found"
- */
- if (idx == m_reccnt) {
- return idx;
- }
+ if (idx >= m_reccnt) {
+ return m_reccnt;
+ }
- /*
- * We may have walked one passed the actual lower bound, so check
- * the index before the current one to see if it is the actual bound
- */
- if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) {
- return idx-1;
+ /*
+ * If the region to search is less than some pre-specified
+ * amount, perform a linear scan to locate the record.
+ */
+ if (bound.hi - bound.lo < 256) {
+ while (idx < bound.hi && m_data[idx].rec.key < key) {
+ idx++;
+ }
+ } else {
+ /* Otherwise, perform a binary search */
+ idx = bound.lo;
+ size_t max = bound.hi;
+
+ while (idx < max) {
+ size_t mid = (idx + max) / 2;
+ if (key > m_data[mid].rec.key) {
+ idx = mid + 1;
+ } else {
+ max = mid;
}
+ }
+ }
+
+ /*
+ * the upper bound returned by PGM is one passed the end of the
+ * array. If we are at that point, we should just return "not found"
+ */
+ if (idx == m_reccnt) {
+ return idx;
+ }
- /*
- * Otherwise, check idx. If it is a valid bound, then return it,
- * otherwise return "not found".
- */
- return (m_data[idx].rec.key >= key) ? idx : m_reccnt;
+ /*
+ * We may have walked one passed the actual lower bound, so check
+ * the index before the current one to see if it is the actual bound
+ */
+ if (m_data[idx].rec.key > key && idx > 0 &&
+ m_data[idx - 1].rec.key <= key) {
+ return idx - 1;
}
+ /*
+ * Otherwise, check idx. If it is a valid bound, then return it,
+ * otherwise return "not found".
+ */
+ return (m_data[idx].rec.key >= key) ? idx : m_reccnt;
+ }
+
private:
- Wrapped<R>* m_data;
- BloomFilter<R> *m_bf;
- size_t m_reccnt;
- size_t m_tombstone_cnt;
- size_t m_alloc_size;
- K m_max_key;
- K m_min_key;
- pgm::PGMIndex<K, epsilon> m_pgm;
+ Wrapped<R> *m_data;
+ BloomFilter<R> *m_bf;
+ size_t m_reccnt;
+ size_t m_tombstone_cnt;
+ size_t m_alloc_size;
+ K m_max_key;
+ K m_min_key;
+ pgm::PGMIndex<K, epsilon> m_pgm;
};
-}
+} // namespace de
diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h
index 1a04afc..2cd514d 100644
--- a/include/shard/TrieSpline.h
+++ b/include/shard/TrieSpline.h
@@ -11,319 +11,306 @@
*/
#pragma once
-
#include <vector>
#include "framework/ShardRequirements.h"
-#include "ts/builder.h"
#include "psu-ds/BloomFilter.h"
-#include "util/bf_config.h"
+#include "ts/builder.h"
#include "util/SortedMerge.h"
+#include "util/bf_config.h"
-using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
+using psudb::byte;
+using psudb::CACHELINE_SIZE;
using psudb::PriorityQueue;
using psudb::queue_record;
-using psudb::byte;
namespace de {
-template <KVPInterface R, size_t E=1024>
-class TrieSpline {
+template <KVPInterface R, size_t E = 1024> class TrieSpline {
public:
- typedef R RECORD;
+ typedef R RECORD;
+
private:
- typedef decltype(R::key) K;
- typedef decltype(R::value) V;
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
public:
- TrieSpline(BufferView<R> buffer)
- : m_reccnt(0)
- , m_tombstone_cnt(0)
- , m_alloc_size(0)
- , m_max_key(0)
- , m_min_key(0)
- , m_bf(nullptr)
- {
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- buffer.get_record_count() *
- sizeof(Wrapped<R>),
- (byte**) &m_data);
+ TrieSpline(BufferView<R> buffer)
+ : m_reccnt(0), m_tombstone_cnt(0), m_alloc_size(0), m_max_key(0),
+ m_min_key(0), m_bf(nullptr) {
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>),
+ (byte **)&m_data);
+
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = (Wrapped<R> *)psudb::sf_aligned_calloc(
+ CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>));
+ buffer.copy_to_buffer((byte *)temp_buffer);
+
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ auto tmp_min_key = temp_buffer[0].rec.key;
+ auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key;
+ auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E);
+
+ merge_info info = {0, 0};
+
+ m_min_key = tmp_min_key;
+ m_max_key = tmp_max_key;
+
+ /*
+ * Iterate over the temporary buffer to process the records, copying
+ * them into buffer as needed
+ */
+ while (base < stop) {
+ if (!base->is_tombstone() && (base + 1 < stop) &&
+ base->rec == (base + 1)->rec && (base + 1)->is_tombstone() &&
+ base->rec.key != m_max_key && base->rec.key != m_min_key) {
+ base += 2;
+ continue;
+ } else if (base->is_deleted() && base->rec.key != m_max_key &&
+ base->rec.key != m_min_key) {
+ base += 1;
+ continue;
+ }
+
+ base->header &= 3;
+ bldr.AddKey(base->rec.key);
+ m_data[info.record_count++] = *base;
+
+ if (base->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf) {
+ m_bf->insert(base->rec);
+ }
+ }
- /*
- * Copy the contents of the buffer view into a temporary buffer, and
- * sort them. We still need to iterate over these temporary records to
- * apply tombstone/deleted record filtering, as well as any possible
- * per-record processing that is required by the shard being built.
- */
- auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE,
- buffer.get_record_count(),
- sizeof(Wrapped<R>));
- buffer.copy_to_buffer((byte *) temp_buffer);
+ base++;
+ }
- auto base = temp_buffer;
- auto stop = base + buffer.get_record_count();
- std::sort(base, stop, std::less<Wrapped<R>>());
+ free(temp_buffer);
- auto tmp_min_key = temp_buffer[0].rec.key;
- auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key;
- auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E);
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
- merge_info info = {0, 0};
+ if (m_reccnt > 50) {
+ m_ts = bldr.Finalize();
+ }
+ }
+
+ TrieSpline(std::vector<const TrieSpline *> const &shards)
+ : m_reccnt(0), m_tombstone_cnt(0), m_alloc_size(0), m_max_key(0),
+ m_min_key(0), m_bf(nullptr) {
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors = build_cursor_vec<R, TrieSpline>(shards, &attemp_reccnt,
+ &tombstone_count);
+
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **)&m_data);
+
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i = 0; i < cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
- m_min_key = tmp_min_key;
- m_max_key = tmp_max_key;
+ auto tmp_max_key = shards[0]->m_max_key;
+ auto tmp_min_key = shards[0]->m_min_key;
- /*
- * Iterate over the temporary buffer to process the records, copying
- * them into buffer as needed
- */
- while (base < stop) {
- if (!base->is_tombstone() && (base + 1 < stop)
- && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()
- && base->rec.key != m_max_key && base->rec.key != m_min_key) {
- base += 2;
- continue;
- } else if (base->is_deleted() && base->rec.key != m_max_key && base->rec.key != m_min_key) {
- base += 1;
- continue;
- }
+ for (size_t i = 0; i < shards.size(); i++) {
+ if (shards[i]->m_max_key > tmp_max_key) {
+ tmp_max_key = shards[i]->m_max_key;
+ }
- base->header &= 3;
- bldr.AddKey(base->rec.key);
- m_data[info.record_count++] = *base;
+ if (shards[i]->m_min_key < tmp_min_key) {
+ tmp_min_key = shards[i]->m_min_key;
+ }
+ }
- if (base->is_tombstone()) {
- info.tombstone_count++;
- if (m_bf){
- m_bf->insert(base->rec);
- }
+ auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E);
+
+ m_max_key = tmp_max_key;
+ m_min_key = tmp_min_key;
+
+ merge_info info = {0, 0};
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next =
+ pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over. Unless the tombstone would remove the maximum or
+ * minimum valued key, which cannot be removed at this point
+ * without breaking triespline
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone() &&
+ now.data->rec.key != tmp_max_key &&
+ now.data->rec.key != tmp_min_key) {
+
+ pq.pop();
+ pq.pop();
+ auto &cursor1 = cursors[now.version];
+ auto &cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1))
+ pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2))
+ pq.push(cursor2.ptr, next.version);
+ } else {
+ auto &cursor = cursors[now.version];
+ /*
+ * skip over records that have been deleted via tagging,
+ * unless they are the max or min keys, which cannot be
+ * removed without breaking triespline
+ */
+ if (!cursor.ptr->is_deleted() || cursor.ptr->rec.key == tmp_max_key ||
+ cursor.ptr->rec.key == tmp_min_key) {
+ bldr.AddKey(cursor.ptr->rec.key);
+ m_data[info.record_count++] = *cursor.ptr;
+
+ /*
+ * if the record is a tombstone, increment the ts count and
+ * insert it into the bloom filter if one has been
+ * provided.
+ */
+ if (cursor.ptr->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf) {
+ m_bf->insert(cursor.ptr->rec);
}
-
- base++;
+ }
}
+ pq.pop();
- free(temp_buffer);
-
- m_reccnt = info.record_count;
- m_tombstone_cnt = info.tombstone_count;
-
- if (m_reccnt > 50) {
- m_ts = bldr.Finalize();
- }
+ if (advance_cursor(cursor))
+ pq.push(cursor.ptr, now.version);
+ }
}
- TrieSpline(std::vector<const TrieSpline*> const &shards)
- : m_reccnt(0)
- , m_tombstone_cnt(0)
- , m_alloc_size(0)
- , m_max_key(0)
- , m_min_key(0)
- , m_bf(nullptr)
- {
- size_t attemp_reccnt = 0;
- size_t tombstone_count = 0;
- auto cursors = build_cursor_vec<R, TrieSpline>(shards, &attemp_reccnt, &tombstone_count);
-
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- attemp_reccnt * sizeof(Wrapped<R>),
- (byte **) &m_data);
-
- // FIXME: For smaller cursor arrays, it may be more efficient to skip
- // the priority queue and just do a scan.
- PriorityQueue<Wrapped<R>> pq(cursors.size());
- for (size_t i=0; i<cursors.size(); i++) {
- pq.push(cursors[i].ptr, i);
- }
-
- auto tmp_max_key = shards[0]->m_max_key;
- auto tmp_min_key = shards[0]->m_min_key;
-
- for (size_t i=0; i<shards.size(); i++) {
- if (shards[i]->m_max_key > tmp_max_key) {
- tmp_max_key = shards[i]->m_max_key;
- }
-
- if (shards[i]->m_min_key < tmp_min_key) {
- tmp_min_key = shards[i]->m_min_key;
- }
- }
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
- auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E);
-
- m_max_key = tmp_max_key;
- m_min_key = tmp_min_key;
-
- merge_info info = {0, 0};
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- /*
- * if the current record is not a tombstone, and the next record is
- * a tombstone that matches the current one, then the current one
- * has been deleted, and both it and its tombstone can be skipped
- * over. Unless the tombstone would remove the maximum or
- * minimum valued key, which cannot be removed at this point
- * without breaking triespline
- */
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()
- && now.data->rec.key != tmp_max_key && now.data->rec.key != tmp_min_key) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- /*
- * skip over records that have been deleted via tagging,
- * unless they are the max or min keys, which cannot be
- * removed without breaking triespline
- */
- if (!cursor.ptr->is_deleted() ||
- cursor.ptr->rec.key == tmp_max_key || cursor.ptr->rec.key == tmp_min_key) {
- bldr.AddKey(cursor.ptr->rec.key);
- m_data[info.record_count++] = *cursor.ptr;
-
- /*
- * if the record is a tombstone, increment the ts count and
- * insert it into the bloom filter if one has been
- * provided.
- */
- if (cursor.ptr->is_tombstone()) {
- info.tombstone_count++;
- if (m_bf) {
- m_bf->insert(cursor.ptr->rec);
- }
- }
- }
- pq.pop();
-
- if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ if (m_reccnt > 50) {
+ m_ts = bldr.Finalize();
+ }
+ }
- m_reccnt = info.record_count;
- m_tombstone_cnt = info.tombstone_count;
+ ~TrieSpline() {
+ free(m_data);
+ delete m_bf;
+ }
- if (m_reccnt > 50) {
- m_ts = bldr.Finalize();
- }
+ Wrapped<R> *point_lookup(const R &rec, bool filter = false) const {
+ if (filter && m_bf && !m_bf->lookup(rec)) {
+ return nullptr;
}
- ~TrieSpline() {
- free(m_data);
- delete m_bf;
+ size_t idx = get_lower_bound(rec.key);
+ if (idx >= m_reccnt) {
+ return nullptr;
}
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) const {
- if (filter && m_bf && !m_bf->lookup(rec)) {
- return nullptr;
- }
-
- size_t idx = get_lower_bound(rec.key);
- if (idx >= m_reccnt) {
- return nullptr;
- }
-
- while (idx < m_reccnt && m_data[idx].rec < rec) ++idx;
-
- if (m_data[idx].rec == rec) {
- return m_data + idx;
- }
+ while (idx < m_reccnt && m_data[idx].rec < rec)
+ ++idx;
- return nullptr;
+ if (m_data[idx].rec == rec) {
+ return m_data + idx;
}
- Wrapped<R>* get_data() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
- }
+ return nullptr;
+ }
- size_t get_tombstone_count() const {
- return m_tombstone_cnt;
- }
+ Wrapped<R> *get_data() const { return m_data; }
- const Wrapped<R>* get_record_at(size_t idx) const {
- if (idx >= m_reccnt) return nullptr;
- return m_data + idx;
- }
+ size_t get_record_count() const { return m_reccnt; }
+ size_t get_tombstone_count() const { return m_tombstone_cnt; }
- size_t get_memory_usage() const {
- return m_ts.GetSize();
- }
+ const Wrapped<R> *get_record_at(size_t idx) const {
+ if (idx >= m_reccnt)
+ return nullptr;
+ return m_data + idx;
+ }
- size_t get_aux_memory_usage() const {
- return (m_bf) ? m_bf->memory_usage() : 0;
- }
+ size_t get_memory_usage() const { return m_ts.GetSize(); }
- size_t get_lower_bound(const K& key) const {
- if (m_reccnt < 50) {
- size_t bd = m_reccnt;
- for (size_t i=0; i<m_reccnt; i++) {
- if (m_data[i].rec.key >= key) {
- bd = i;
- break;
- }
- }
+ size_t get_aux_memory_usage() const {
+ return (m_bf) ? m_bf->memory_usage() : 0;
+ }
- return bd;
+ size_t get_lower_bound(const K &key) const {
+ if (m_reccnt < 50) {
+ size_t bd = m_reccnt;
+ for (size_t i = 0; i < m_reccnt; i++) {
+ if (m_data[i].rec.key >= key) {
+ bd = i;
+ break;
}
+ }
- auto bound = m_ts.GetSearchBound(key);
- size_t idx = bound.begin;
+ return bd;
+ }
- if (idx >= m_reccnt) {
- return m_reccnt;
- }
+ auto bound = m_ts.GetSearchBound(key);
+ size_t idx = bound.begin;
- // If the region to search is less than some pre-specified
- // amount, perform a linear scan to locate the record.
- if (bound.end - bound.begin < 256) {
- while (idx < bound.end && m_data[idx].rec.key < key) {
- idx++;
- }
- } else {
- // Otherwise, perform a binary search
- idx = bound.begin;
- size_t max = bound.end;
-
- while (idx < max) {
- size_t mid = (idx + max) / 2;
- if (key > m_data[mid].rec.key) {
- idx = mid + 1;
- } else {
- max = mid;
- }
- }
- }
+ if (idx >= m_reccnt) {
+ return m_reccnt;
+ }
- if (idx == m_reccnt) {
- return m_reccnt;
+ // If the region to search is less than some pre-specified
+ // amount, perform a linear scan to locate the record.
+ if (bound.end - bound.begin < 256) {
+ while (idx < bound.end && m_data[idx].rec.key < key) {
+ idx++;
+ }
+ } else {
+ // Otherwise, perform a binary search
+ idx = bound.begin;
+ size_t max = bound.end;
+
+ while (idx < max) {
+ size_t mid = (idx + max) / 2;
+ if (key > m_data[mid].rec.key) {
+ idx = mid + 1;
+ } else {
+ max = mid;
}
+ }
+ }
- if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) {
- return idx-1;
- }
+ if (idx == m_reccnt) {
+ return m_reccnt;
+ }
- return idx;
+ if (m_data[idx].rec.key > key && idx > 0 &&
+ m_data[idx - 1].rec.key <= key) {
+ return idx - 1;
}
-private:
+ return idx;
+ }
- Wrapped<R>* m_data;
- size_t m_reccnt;
- size_t m_tombstone_cnt;
- size_t m_alloc_size;
- K m_max_key;
- K m_min_key;
- ts::TrieSpline<K> m_ts;
- BloomFilter<R> *m_bf;
+private:
+ Wrapped<R> *m_data;
+ size_t m_reccnt;
+ size_t m_tombstone_cnt;
+ size_t m_alloc_size;
+ K m_max_key;
+ K m_min_key;
+ ts::TrieSpline<K> m_ts;
+ BloomFilter<R> *m_bf;
};
-}
+} // namespace de
diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h
index 7130efe..33ce9b9 100644
--- a/include/shard/VPTree.h
+++ b/include/shard/VPTree.h
@@ -15,379 +15,376 @@
#include <vector>
-#include <unordered_map>
#include "framework/ShardRequirements.h"
#include "psu-ds/PriorityQueue.h"
+#include <unordered_map>
+using psudb::byte;
using psudb::CACHELINE_SIZE;
using psudb::PriorityQueue;
-using psudb::byte;
namespace de {
-template <NDRecordInterface R, size_t LEAFSZ=100, bool HMAP=false>
+template <NDRecordInterface R, size_t LEAFSZ = 100, bool HMAP = false>
class VPTree {
public:
- typedef R RECORD;
+ typedef R RECORD;
private:
- struct vpnode {
- size_t start;
- size_t stop;
- bool leaf;
+ struct vpnode {
+ size_t start;
+ size_t stop;
+ bool leaf;
+
+ double radius;
+ vpnode *inside;
+ vpnode *outside;
+
+ vpnode()
+ : start(0), stop(0), leaf(false), radius(0.0), inside(nullptr),
+ outside(nullptr) {}
+
+ ~vpnode() {
+ delete inside;
+ delete outside;
+ }
+ };
- double radius;
- vpnode *inside;
- vpnode *outside;
+public:
+ VPTree(BufferView<R> buffer)
+ : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) {
+
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>),
+ (byte **)&m_data);
+
+ m_ptrs = new vp_ptr[buffer.get_record_count()];
+ m_reccnt = 0;
+
+ // FIXME: will eventually need to figure out tombstones
+ // this one will likely require the multi-pass
+ // approach, as otherwise we'll need to sort the
+ // records repeatedly on each reconstruction.
+ for (size_t i = 0; i < buffer.get_record_count(); i++) {
+ auto rec = buffer.get(i);
+
+ if (rec->is_deleted()) {
+ continue;
+ }
+
+ rec->header &= 3;
+ m_data[m_reccnt] = *rec;
+ m_ptrs[m_reccnt].ptr = &m_data[m_reccnt];
+ m_reccnt++;
+ }
- vpnode() : start(0), stop(0), leaf(false), radius(0.0), inside(nullptr), outside(nullptr) {}
+ if (m_reccnt > 0) {
+ m_root = build_vptree();
+ build_map();
+ }
+ }
- ~vpnode() {
- delete inside;
- delete outside;
- }
- };
+ VPTree(std::vector<const VPTree *> shards)
+ : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) {
+ size_t attemp_reccnt = 0;
+ for (size_t i = 0; i < shards.size(); i++) {
+ attemp_reccnt += shards[i]->get_record_count();
+ }
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **)&m_data);
+ m_ptrs = new vp_ptr[attemp_reccnt];
+
+ // FIXME: will eventually need to figure out tombstones
+ // this one will likely require the multi-pass
+ // approach, as otherwise we'll need to sort the
+ // records repeatedly on each reconstruction.
+ for (size_t i = 0; i < shards.size(); i++) {
+ for (size_t j = 0; j < shards[i]->get_record_count(); j++) {
+ if (shards[i]->get_record_at(j)->is_deleted()) {
+ continue;
+ }
-public:
- VPTree(BufferView<R> buffer)
- : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) {
+ m_data[m_reccnt] = *shards[i]->get_record_at(j);
+ m_ptrs[m_reccnt].ptr = &m_data[m_reccnt];
+ m_reccnt++;
+ }
+ }
+ if (m_reccnt > 0) {
+ m_root = build_vptree();
+ build_map();
+ }
+ }
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- buffer.get_record_count() *
- sizeof(Wrapped<R>),
- (byte**) &m_data);
+ ~VPTree() {
+ free(m_data);
+ delete m_root;
+ delete[] m_ptrs;
+ }
- m_ptrs = new vp_ptr[buffer.get_record_count()];
- m_reccnt = 0;
+ Wrapped<R> *point_lookup(const R &rec, bool filter = false) {
+ if constexpr (HMAP) {
+ auto idx = m_lookup_map.find(rec);
- // FIXME: will eventually need to figure out tombstones
- // this one will likely require the multi-pass
- // approach, as otherwise we'll need to sort the
- // records repeatedly on each reconstruction.
- for (size_t i=0; i<buffer.get_record_count(); i++) {
- auto rec = buffer.get(i);
+ if (idx == m_lookup_map.end()) {
+ return nullptr;
+ }
- if (rec->is_deleted()) {
- continue;
- }
+ return m_data + idx->second;
+ } else {
+ vpnode *node = m_root;
- rec->header &= 3;
- m_data[m_reccnt] = *rec;
- m_ptrs[m_reccnt].ptr = &m_data[m_reccnt];
- m_reccnt++;
+ while (!node->leaf && m_ptrs[node->start].ptr->rec != rec) {
+ if (rec.calc_distance((m_ptrs[node->start].ptr->rec)) >= node->radius) {
+ node = node->outside;
+ } else {
+ node = node->inside;
}
+ }
- if (m_reccnt > 0) {
- m_root = build_vptree();
- build_map();
+ for (size_t i = node->start; i <= node->stop; i++) {
+ if (m_ptrs[i].ptr->rec == rec) {
+ return m_ptrs[i].ptr;
}
+ }
+
+ return nullptr;
}
+ }
- VPTree(std::vector<const VPTree*> shards)
- : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) {
+ Wrapped<R> *get_data() const { return m_data; }
- size_t attemp_reccnt = 0;
- for (size_t i=0; i<shards.size(); i++) {
- attemp_reccnt += shards[i]->get_record_count();
- }
+ size_t get_record_count() const { return m_reccnt; }
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- attemp_reccnt * sizeof(Wrapped<R>),
- (byte **) &m_data);
- m_ptrs = new vp_ptr[attemp_reccnt];
-
- // FIXME: will eventually need to figure out tombstones
- // this one will likely require the multi-pass
- // approach, as otherwise we'll need to sort the
- // records repeatedly on each reconstruction.
- for (size_t i=0; i<shards.size(); i++) {
- for (size_t j=0; j<shards[i]->get_record_count(); j++) {
- if (shards[i]->get_record_at(j)->is_deleted()) {
- continue;
- }
-
- m_data[m_reccnt] = *shards[i]->get_record_at(j);
- m_ptrs[m_reccnt].ptr = &m_data[m_reccnt];
- m_reccnt++;
- }
- }
+ size_t get_tombstone_count() const { return m_tombstone_cnt; }
- if (m_reccnt > 0) {
- m_root = build_vptree();
- build_map();
- }
- }
+ const Wrapped<R> *get_record_at(size_t idx) const {
+ if (idx >= m_reccnt)
+ return nullptr;
+ return m_data + idx;
+ }
- ~VPTree() {
- free(m_data);
- delete m_root;
- delete[] m_ptrs;
- }
+ size_t get_memory_usage() const {
+ return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R *);
+ }
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
- if constexpr (HMAP) {
- auto idx = m_lookup_map.find(rec);
+ size_t get_aux_memory_usage() const {
+ // FIXME: need to return the size of the unordered_map
+ return 0;
+ }
- if (idx == m_lookup_map.end()) {
- return nullptr;
- }
+ void search(const R &point, size_t k,
+ PriorityQueue<Wrapped<R>, DistCmpMax<Wrapped<R>>> &pq) {
+ double farthest = std::numeric_limits<double>::max();
- return m_data + idx->second;
- } else {
- vpnode *node = m_root;
-
- while (!node->leaf && m_ptrs[node->start].ptr->rec != rec) {
- if (rec.calc_distance((m_ptrs[node->start].ptr->rec)) >= node->radius) {
- node = node->outside;
- } else {
- node = node->inside;
- }
- }
-
- for (size_t i=node->start; i<=node->stop; i++) {
- if (m_ptrs[i].ptr->rec == rec) {
- return m_ptrs[i].ptr;
- }
- }
-
- return nullptr;
- }
- }
+ internal_search(m_root, point, k, pq, &farthest);
+ }
- Wrapped<R>* get_data() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
+private:
+ struct vp_ptr {
+ Wrapped<R> *ptr;
+ double dist;
+ };
+ Wrapped<R> *m_data;
+ vp_ptr *m_ptrs;
+ std::unordered_map<R, size_t, RecordHash<R>> m_lookup_map;
+ size_t m_reccnt;
+ size_t m_tombstone_cnt;
+ size_t m_node_cnt;
+ size_t m_alloc_size;
+
+ vpnode *m_root;
+
+ vpnode *build_vptree() {
+ if (m_reccnt == 0) {
+ return nullptr;
}
- size_t get_tombstone_count() const {
- return m_tombstone_cnt;
- }
+ size_t lower = 0;
+ size_t upper = m_reccnt - 1;
- const Wrapped<R>* get_record_at(size_t idx) const {
- if (idx >= m_reccnt) return nullptr;
- return m_data + idx;
- }
+ auto rng = gsl_rng_alloc(gsl_rng_mt19937);
+ auto root = build_subtree(lower, upper, rng);
+ gsl_rng_free(rng);
+ return root;
+ }
- size_t get_memory_usage() const {
- return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*);
+ void build_map() {
+ // Skip constructing the hashmap if disabled in the
+ // template parameters.
+ if constexpr (!HMAP) {
+ return;
}
- size_t get_aux_memory_usage() const {
- // FIXME: need to return the size of the unordered_map
- return 0;
+ for (size_t i = 0; i < m_reccnt; i++) {
+ // FIXME: Will need to account for tombstones here too. Under
+ // tombstones, it is technically possible for two otherwise identical
+ // instances of the same record to exist within the same shard, so
+ // long as one of them is a tombstone. Because the table is currently
+ // using the unwrapped records for the key, it isn't possible for it
+ // to handle this case right now.
+ m_lookup_map.insert({m_data[i].rec, i});
}
-
- void search(const R &point, size_t k, PriorityQueue<Wrapped<R>,
- DistCmpMax<Wrapped<R>>> &pq) {
- double farthest = std::numeric_limits<double>::max();
-
- internal_search(m_root, point, k, pq, &farthest);
+ }
+
+ vpnode *build_subtree(size_t start, size_t stop, gsl_rng *rng) {
+ /*
+ * base-case: sometimes happens (probably because of the +1 and -1
+ * in the first recursive call)
+ */
+ if (start > stop) {
+ return nullptr;
}
-private:
- struct vp_ptr {
- Wrapped<R> *ptr;
- double dist;
- };
- Wrapped<R>* m_data;
- vp_ptr* m_ptrs;
- std::unordered_map<R, size_t, RecordHash<R>> m_lookup_map;
- size_t m_reccnt;
- size_t m_tombstone_cnt;
- size_t m_node_cnt;
- size_t m_alloc_size;
-
- vpnode *m_root;
-
- vpnode *build_vptree() {
- if (m_reccnt == 0) {
- return nullptr;
- }
-
- size_t lower = 0;
- size_t upper = m_reccnt - 1;
+ /* base-case: create a leaf node */
+ if (stop - start <= LEAFSZ) {
+ vpnode *node = new vpnode();
+ node->start = start;
+ node->stop = stop;
+ node->leaf = true;
- auto rng = gsl_rng_alloc(gsl_rng_mt19937);
- auto root = build_subtree(lower, upper, rng);
- gsl_rng_free(rng);
- return root;
+ m_node_cnt++;
+ return node;
}
- void build_map() {
- // Skip constructing the hashmap if disabled in the
- // template parameters.
- if constexpr (!HMAP) {
- return;
- }
-
- for (size_t i=0; i<m_reccnt; i++) {
- // FIXME: Will need to account for tombstones here too. Under
- // tombstones, it is technically possible for two otherwise identical
- // instances of the same record to exist within the same shard, so
- // long as one of them is a tombstone. Because the table is currently
- // using the unwrapped records for the key, it isn't possible for it
- // to handle this case right now.
- m_lookup_map.insert({m_data[i].rec, i});
- }
+ /*
+ * select a random element to be the root of the
+ * subtree
+ */
+ auto i = start + gsl_rng_uniform_int(rng, stop - start + 1);
+ swap(start, i);
+
+ /* for efficiency, we'll pre-calculate the distances between each point and
+ * the root */
+ for (size_t i = start + 1; i <= stop; i++) {
+ m_ptrs[i].dist = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[i].ptr->rec);
}
- vpnode *build_subtree(size_t start, size_t stop, gsl_rng *rng) {
- /*
- * base-case: sometimes happens (probably because of the +1 and -1
- * in the first recursive call)
- */
- if (start > stop) {
- return nullptr;
- }
-
- /* base-case: create a leaf node */
- if (stop - start <= LEAFSZ) {
- vpnode *node = new vpnode();
- node->start = start;
- node->stop = stop;
- node->leaf = true;
-
- m_node_cnt++;
- return node;
- }
-
- /*
- * select a random element to be the root of the
- * subtree
- */
- auto i = start + gsl_rng_uniform_int(rng, stop - start + 1);
- swap(start, i);
-
- /* for efficiency, we'll pre-calculate the distances between each point and the root */
- for (size_t i=start+1; i<=stop; i++) {
- m_ptrs[i].dist = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[i].ptr->rec);
- }
-
- /*
- * partition elements based on their distance from the start,
- * with those elements with distance falling below the median
- * distance going into the left sub-array and those above
- * the median in the right. This is easily done using QuickSelect.
- */
- auto mid = (start + 1 + stop) / 2;
- quickselect(start + 1, stop, mid, m_ptrs[start].ptr, rng);
-
- /* Create a new node based on this partitioning */
- vpnode *node = new vpnode();
- node->start = start;
+ /*
+ * partition elements based on their distance from the start,
+ * with those elements with distance falling below the median
+ * distance going into the left sub-array and those above
+ * the median in the right. This is easily done using QuickSelect.
+ */
+ auto mid = (start + 1 + stop) / 2;
+ quickselect(start + 1, stop, mid, m_ptrs[start].ptr, rng);
- /* store the radius of the circle used for partitioning the node. */
- node->radius = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[mid].ptr->rec);
- m_ptrs[start].dist = node->radius;
+ /* Create a new node based on this partitioning */
+ vpnode *node = new vpnode();
+ node->start = start;
- /* recursively construct the left and right subtrees */
- node->inside = build_subtree(start + 1, mid-1, rng);
- node->outside = build_subtree(mid, stop, rng);
+ /* store the radius of the circle used for partitioning the node. */
+ node->radius = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[mid].ptr->rec);
+ m_ptrs[start].dist = node->radius;
- m_node_cnt++;
+ /* recursively construct the left and right subtrees */
+ node->inside = build_subtree(start + 1, mid - 1, rng);
+ node->outside = build_subtree(mid, stop, rng);
- return node;
- }
+ m_node_cnt++;
- void quickselect(size_t start, size_t stop, size_t k, Wrapped<R> *p, gsl_rng *rng) {
- if (start == stop) return;
+ return node;
+ }
- auto pivot = partition(start, stop, p, rng);
+ void quickselect(size_t start, size_t stop, size_t k, Wrapped<R> *p,
+ gsl_rng *rng) {
+ if (start == stop)
+ return;
- if (k < pivot) {
- quickselect(start, pivot - 1, k, p, rng);
- } else if (k > pivot) {
- quickselect(pivot + 1, stop, k, p, rng);
- }
- }
+ auto pivot = partition(start, stop, p, rng);
- // TODO: The quickselect code can probably be generalized and moved out
- // to psudb-common instead.
- size_t partition(size_t start, size_t stop, Wrapped<R> *p, gsl_rng *rng) {
- auto pivot = start + gsl_rng_uniform_int(rng, stop - start);
- //double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec);
-
- swap(pivot, stop);
-
- size_t j = start;
- for (size_t i=start; i<stop; i++) {
- if (m_ptrs[i].dist < m_ptrs[stop].dist) {
- //assert(distances[i - start] == p->rec.calc_distance(m_ptrs[i]->rec));
- //if (distances[i - start] < distances[stop - start]) {
- //if (p->rec .calc_distance(m_ptrs[i]->rec) < pivot_dist) {
- swap(j, i);
- j++;
- }
- }
-
- swap(j, stop);
- return j;
+ if (k < pivot) {
+ quickselect(start, pivot - 1, k, p, rng);
+ } else if (k > pivot) {
+ quickselect(pivot + 1, stop, k, p, rng);
}
-
- void swap(size_t idx1, size_t idx2) {
- auto tmp = m_ptrs[idx1];
- m_ptrs[idx1] = m_ptrs[idx2];
- m_ptrs[idx2] = tmp;
+ }
+
+ // TODO: The quickselect code can probably be generalized and moved out
+ // to psudb-common instead.
+ size_t partition(size_t start, size_t stop, Wrapped<R> *p, gsl_rng *rng) {
+ auto pivot = start + gsl_rng_uniform_int(rng, stop - start);
+ // double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec);
+
+ swap(pivot, stop);
+
+ size_t j = start;
+ for (size_t i = start; i < stop; i++) {
+ if (m_ptrs[i].dist < m_ptrs[stop].dist) {
+ // assert(distances[i - start] == p->rec.calc_distance(m_ptrs[i]->rec));
+ // if (distances[i - start] < distances[stop - start]) {
+ // if (p->rec .calc_distance(m_ptrs[i]->rec) < pivot_dist) {
+ swap(j, i);
+ j++;
+ }
}
- void internal_search(vpnode *node, const R &point, size_t k, PriorityQueue<Wrapped<R>,
- DistCmpMax<Wrapped<R>>> &pq, double *farthest) {
-
- if (node == nullptr) return;
+ swap(j, stop);
+ return j;
+ }
- if (node->leaf) {
- for (size_t i=node->start; i<=node->stop; i++) {
- double d = point.calc_distance(m_ptrs[i].ptr->rec);
- if (d < *farthest) {
- if (pq.size() == k) {
- pq.pop();
- }
+ void swap(size_t idx1, size_t idx2) {
+ auto tmp = m_ptrs[idx1];
+ m_ptrs[idx1] = m_ptrs[idx2];
+ m_ptrs[idx2] = tmp;
+ }
- pq.push(m_ptrs[i].ptr);
- if (pq.size() == k) {
- *farthest = point.calc_distance(pq.peek().data->rec);
- }
- }
- }
+ void internal_search(vpnode *node, const R &point, size_t k,
+ PriorityQueue<Wrapped<R>, DistCmpMax<Wrapped<R>>> &pq,
+ double *farthest) {
- return;
- }
-
- double d = point.calc_distance(m_ptrs[node->start].ptr->rec);
+ if (node == nullptr)
+ return;
+ if (node->leaf) {
+ for (size_t i = node->start; i <= node->stop; i++) {
+ double d = point.calc_distance(m_ptrs[i].ptr->rec);
if (d < *farthest) {
- if (pq.size() == k) {
- pq.pop();
- }
- pq.push(m_ptrs[node->start].ptr);
- if (pq.size() == k) {
- *farthest = point.calc_distance(pq.peek().data->rec);
- }
+ if (pq.size() == k) {
+ pq.pop();
+ }
+
+ pq.push(m_ptrs[i].ptr);
+ if (pq.size() == k) {
+ *farthest = point.calc_distance(pq.peek().data->rec);
+ }
}
+ }
- if (d < node->radius) {
- if (d - (*farthest) <= node->radius) {
- internal_search(node->inside, point, k, pq, farthest);
- }
+ return;
+ }
- if (d + (*farthest) >= node->radius) {
- internal_search(node->outside, point, k, pq, farthest);
- }
- } else {
- if (d + (*farthest) >= node->radius) {
- internal_search(node->outside, point, k, pq, farthest);
- }
+ double d = point.calc_distance(m_ptrs[node->start].ptr->rec);
- if (d - (*farthest) <= node->radius) {
- internal_search(node->inside, point, k, pq, farthest);
- }
- }
+ if (d < *farthest) {
+ if (pq.size() == k) {
+ pq.pop();
+ }
+ pq.push(m_ptrs[node->start].ptr);
+ if (pq.size() == k) {
+ *farthest = point.calc_distance(pq.peek().data->rec);
+ }
+ }
+
+ if (d < node->radius) {
+ if (d - (*farthest) <= node->radius) {
+ internal_search(node->inside, point, k, pq, farthest);
+ }
+
+ if (d + (*farthest) >= node->radius) {
+ internal_search(node->outside, point, k, pq, farthest);
+ }
+ } else {
+ if (d + (*farthest) >= node->radius) {
+ internal_search(node->outside, point, k, pq, farthest);
+ }
+
+ if (d - (*farthest) <= node->radius) {
+ internal_search(node->inside, point, k, pq, farthest);
+ }
}
- };
-}
+ }
+};
+} // namespace de