summaryrefslogtreecommitdiffstats
path: root/include/shard
diff options
context:
space:
mode:
Diffstat (limited to 'include/shard')
-rw-r--r--include/shard/Alias.h2
-rw-r--r--include/shard/AugBTree.h2
-rw-r--r--include/shard/FSTrie.h200
-rw-r--r--include/shard/ISAMTree.h14
-rw-r--r--include/shard/LoudsPatricia.h199
-rw-r--r--include/shard/PGM.h150
-rw-r--r--include/shard/TrieSpline.h185
-rw-r--r--include/shard/VPTree.h51
8 files changed, 723 insertions, 80 deletions
diff --git a/include/shard/Alias.h b/include/shard/Alias.h
index 9275952..72147d7 100644
--- a/include/shard/Alias.h
+++ b/include/shard/Alias.h
@@ -148,7 +148,7 @@ public:
size_t get_memory_usage() {
- return m_alloc_size;
+ return 0;
}
size_t get_aux_memory_usage() {
diff --git a/include/shard/AugBTree.h b/include/shard/AugBTree.h
index 54931bd..c60cbcd 100644
--- a/include/shard/AugBTree.h
+++ b/include/shard/AugBTree.h
@@ -148,7 +148,7 @@ public:
}
size_t get_memory_usage() {
- return m_alloc_size + m_node_cnt * sizeof(AugBTreeNode<Wrapped<R>>);
+ return m_node_cnt * sizeof(AugBTreeNode<Wrapped<R>>);
}
size_t get_aux_memory_usage() {
diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h
new file mode 100644
index 0000000..3783b38
--- /dev/null
+++ b/include/shard/FSTrie.h
@@ -0,0 +1,200 @@
+/*
+ * include/shard/FSTrie.h
+ *
+ * Copyright (C) 2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * A shard shim around the FSTrie learned index.
+ */
+#pragma once
+
+
+#include <vector>
+
+#include "framework/ShardRequirements.h"
+#include "fst.hpp"
+#include "util/SortedMerge.h"
+
+using psudb::CACHELINE_SIZE;
+using psudb::BloomFilter;
+using psudb::PriorityQueue;
+using psudb::queue_record;
+using psudb::byte;
+
+namespace de {
+
+template <KVPInterface R>
+class FSTrie {
+private:
+
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+ static_assert(std::is_same_v<K, const char*>, "FST requires const char* keys.");
+
+public:
+ FSTrie(BufferView<R> buffer)
+ : m_data(nullptr)
+ , m_reccnt(0)
+ , m_alloc_size(0)
+ {
+ m_data = new Wrapped<R>[buffer.get_record_count()]();
+ m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count();
+
+ size_t cnt = 0;
+ std::vector<std::string> keys;
+ keys.reserve(buffer.get_record_count());
+
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = new Wrapped<R>[buffer.get_record_count()]();
+ for (size_t i=0; i<buffer.get_record_count(); i++) {
+ temp_buffer[i] = *(buffer.get(i));
+ }
+
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ for (size_t i=0; i<buffer.get_record_count(); i++) {
+ if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || temp_buffer[i].rec.key == "") {
+ continue;
+ }
+
+ m_data[cnt] = temp_buffer[i];
+ m_data[cnt].clear_timestamp();
+
+ keys.push_back(std::string(m_data[cnt].rec.key));
+ cnt++;
+ }
+
+ m_reccnt = cnt;
+ if (m_reccnt > 0) {
+ m_fst = new fst::Trie(keys, true, 1);
+ }
+
+ delete[] temp_buffer;
+ }
+
+ FSTrie(std::vector<FSTrie*> &shards)
+ : m_data(nullptr)
+ , m_reccnt(0)
+ , m_alloc_size(0)
+ {
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors = build_cursor_vec<R, FSTrie>(shards, &attemp_reccnt, &tombstone_count);
+
+ m_data = new Wrapped<R>[attemp_reccnt]();
+ m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>);
+
+ std::vector<std::string> keys;
+ keys.reserve(attemp_reccnt);
+
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i=0; i<cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
+
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop(); pq.pop();
+ auto& cursor1 = cursors[now.version];
+ auto& cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
+ } else {
+ auto& cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") {
+ m_data[m_reccnt] = *cursor.ptr;
+ keys.push_back(std::string(m_data[m_reccnt].rec.key));
+
+ m_reccnt++;
+ }
+ pq.pop();
+
+ if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
+ }
+ }
+
+ if (m_reccnt > 0) {
+ m_fst = new fst::Trie(keys, true, 1);
+ }
+ }
+
+ ~FSTrie() {
+ delete[] m_data;
+ delete m_fst;
+ }
+
+ Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
+
+ auto idx = m_fst->exactSearch(rec.key);
+
+ if (idx == fst::kNotFound) {
+ return nullptr;
+ }
+
+ // FIXME: for convenience, I'm treating this Trie as a unique index
+ // for now, so no need to scan forward and/or check values. This
+ // also makes the point lookup query class a lot easier to make.
+ // Ultimately, though, we can support non-unique indexes with some
+ // extra work.
+
+ return m_data + idx;
+ }
+
+ Wrapped<R>* get_data() const {
+ return m_data;
+ }
+
+ size_t get_record_count() const {
+ return m_reccnt;
+ }
+
+ size_t get_tombstone_count() const {
+ return 0;
+ }
+
+ const Wrapped<R>* get_record_at(size_t idx) const {
+ if (idx >= m_reccnt) return nullptr;
+ return m_data + idx;
+ }
+
+
+ size_t get_memory_usage() {
+ return m_fst->getMemoryUsage();
+ }
+
+ size_t get_aux_memory_usage() {
+ return m_alloc_size;
+ }
+
+ size_t get_lower_bound(R &rec) {return 0;}
+ size_t get_upper_bound(R &rec) {return 0;}
+
+private:
+
+ Wrapped<R>* m_data;
+ size_t m_reccnt;
+ size_t m_alloc_size;
+ fst::Trie *m_fst;
+};
+}
diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h
index 3763271..1cca506 100644
--- a/include/shard/ISAMTree.h
+++ b/include/shard/ISAMTree.h
@@ -51,7 +51,7 @@ constexpr static size_t LEAF_FANOUT = NODE_SZ / sizeof(R);
public:
ISAMTree(BufferView<R> buffer)
- : m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS))
+ : m_bf(nullptr)
, m_isam_nodes(nullptr)
, m_root(nullptr)
, m_reccnt(0)
@@ -59,19 +59,12 @@ public:
, m_internal_node_cnt(0)
, m_deleted_cnt(0)
, m_alloc_size(0)
- , m_data(nullptr)
{
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
buffer.get_record_count() *
sizeof(Wrapped<R>),
(byte**) &m_data);
- /*
- * without this, gcc seems to hoist the building of the array
- * _above_ its allocation under -O3, resulting in memfaults.
- */
- asm volatile ("" ::: "memory");
-
auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf);
m_reccnt = res.record_count;
m_tombstone_cnt = res.tombstone_count;
@@ -90,13 +83,12 @@ public:
, m_internal_node_cnt(0)
, m_deleted_cnt(0)
, m_alloc_size(0)
- , m_data(nullptr)
{
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
auto cursors = build_cursor_vec<R, ISAMTree>(shards, &attemp_reccnt, &tombstone_count);
- m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
+ m_bf = nullptr;
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
attemp_reccnt * sizeof(Wrapped<R>),
(byte **) &m_data);
@@ -149,7 +141,7 @@ public:
size_t get_memory_usage() {
- return m_alloc_size + m_internal_node_cnt * NODE_SZ;
+ return m_internal_node_cnt * NODE_SZ;
}
size_t get_aux_memory_usage() {
diff --git a/include/shard/LoudsPatricia.h b/include/shard/LoudsPatricia.h
new file mode 100644
index 0000000..3452839
--- /dev/null
+++ b/include/shard/LoudsPatricia.h
@@ -0,0 +1,199 @@
+/*
+ * include/shard/LoudsPatricia.h
+ *
+ * Copyright (C) 2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * A shard shim around the LoudsPatricia learned index.
+ */
+#pragma once
+
+
+#include <vector>
+
+#include "framework/ShardRequirements.h"
+#include "louds-patricia.hpp"
+#include "util/SortedMerge.h"
+
+using psudb::CACHELINE_SIZE;
+using psudb::BloomFilter;
+using psudb::PriorityQueue;
+using psudb::queue_record;
+using psudb::byte;
+
+namespace de {
+
+template <KVPInterface R>
+class LoudsPatricia {
+private:
+
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+ static_assert(std::is_same_v<K, const char*>, "FST requires const char* keys.");
+
+public:
+ LoudsPatricia(BufferView<R> buffer)
+ : m_data(nullptr)
+ , m_reccnt(0)
+ , m_alloc_size(0)
+ {
+ m_data = new Wrapped<R>[buffer.get_record_count()]();
+ m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count();
+
+ m_louds = new louds::Patricia();
+
+ size_t cnt = 0;
+ std::vector<std::string> keys;
+ keys.reserve(buffer.get_record_count());
+
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = new Wrapped<R>[buffer.get_record_count()]();
+ for (size_t i=0; i<buffer.get_record_count(); i++) {
+ temp_buffer[i] = *(buffer.get(i));
+ }
+
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ for (size_t i=0; i<buffer.get_record_count(); i++) {
+ if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || temp_buffer[i].rec.key == "") {
+ continue;
+ }
+
+ m_data[cnt] = temp_buffer[i];
+ m_data[cnt].clear_timestamp();
+
+ m_louds->add(std::string(m_data[cnt].rec.key));
+ cnt++;
+ }
+
+ m_reccnt = cnt;
+ if (m_reccnt > 0) {
+ m_louds->build();
+ }
+
+ delete[] temp_buffer;
+ }
+
+ LoudsPatricia(std::vector<LoudsPatricia*> &shards)
+ : m_data(nullptr)
+ , m_reccnt(0)
+ , m_alloc_size(0)
+ {
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors = build_cursor_vec<R, LoudsPatricia>(shards, &attemp_reccnt, &tombstone_count);
+
+ m_data = new Wrapped<R>[attemp_reccnt]();
+ m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>);
+
+ m_louds = new louds::Patricia();
+
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i=0; i<cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
+
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop(); pq.pop();
+ auto& cursor1 = cursors[now.version];
+ auto& cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
+ } else {
+ auto& cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") {
+ m_data[m_reccnt] = *cursor.ptr;
+ m_louds->add(std::string(m_data[m_reccnt].rec.key));
+ m_reccnt++;
+ }
+ pq.pop();
+
+ if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
+ }
+ }
+
+ if (m_reccnt > 0) {
+ m_louds->build();
+ }
+ }
+
+ ~LoudsPatricia() {
+ delete[] m_data;
+ delete m_louds;
+ }
+
+ Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
+
+ auto idx = m_louds->lookup(std::string(rec.key));
+
+ if (idx == -1) {
+ return nullptr;
+ }
+
+ // FIXME: for convenience, I'm treating this Trie as a unique index
+ // for now, so no need to scan forward and/or check values. This
+ // also makes the point lookup query class a lot easier to make.
+ // Ultimately, though, we can support non-unique indexes with some
+ // extra work.
+ return m_data + idx;
+ }
+
+ Wrapped<R>* get_data() const {
+ return m_data;
+ }
+
+ size_t get_record_count() const {
+ return m_reccnt;
+ }
+
+ size_t get_tombstone_count() const {
+ return 0;
+ }
+
+ const Wrapped<R>* get_record_at(size_t idx) const {
+ if (idx >= m_reccnt) return nullptr;
+ return m_data + idx;
+ }
+
+
+ size_t get_memory_usage() {
+ return m_louds->size();
+ }
+
+ size_t get_aux_memory_usage() {
+ return m_alloc_size;
+ }
+
+ size_t get_lower_bound(R &rec) {return 0;}
+ size_t get_upper_bound(R &rec) {return 0;}
+
+private:
+
+ Wrapped<R>* m_data;
+ size_t m_reccnt;
+ size_t m_alloc_size;
+ louds::Patricia *m_louds;
+};
+}
diff --git a/include/shard/PGM.h b/include/shard/PGM.h
index e2752ef..509796b 100644
--- a/include/shard/PGM.h
+++ b/include/shard/PGM.h
@@ -39,8 +39,7 @@ private:
public:
PGM(BufferView<R> buffer)
- : m_data(nullptr)
- , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS))
+ : m_bf(nullptr)
, m_reccnt(0)
, m_tombstone_cnt(0)
, m_alloc_size(0) {
@@ -49,16 +48,63 @@ public:
buffer.get_record_count() *
sizeof(Wrapped<R>),
(byte**) &m_data);
- auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf);
- m_reccnt = res.record_count;
- m_tombstone_cnt = res.tombstone_count;
- if (m_reccnt > 0) {
- std::vector<K> keys;
- for (size_t i=0; i<m_reccnt; i++) {
- keys.emplace_back(m_data[i].rec.key);
+ std::vector<K> keys;
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE,
+ buffer.get_record_count(),
+ sizeof(Wrapped<R>));
+ buffer.copy_to_buffer((byte *) temp_buffer);
+
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ merge_info info = {0, 0};
+
+ /*
+ * Iterate over the temporary buffer to process the records, copying
+ * them into buffer as needed
+ */
+ while (base < stop) {
+ if (!base->is_tombstone() && (base + 1 < stop)
+ && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
+ base += 2;
+ continue;
+ } else if (base->is_deleted()) {
+ base += 1;
+ continue;
+ }
+
+ // FIXME: this shouldn't be necessary, but the tagged record
+ // bypass doesn't seem to be working on this code-path, so this
+ // ensures that tagged records from the buffer are able to be
+ // dropped, eventually. It should only need to be &= 1
+ base->header &= 3;
+ keys.emplace_back(base->rec.key);
+ m_data[info.record_count++] = *base;
+
+ if (base->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf){
+ m_bf->insert(base->rec);
+ }
}
+ base++;
+ }
+
+ free(temp_buffer);
+
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
+
+ if (m_reccnt > 0) {
m_pgm = pgm::PGMIndex<K, epsilon>(keys);
}
}
@@ -74,21 +120,65 @@ public:
size_t tombstone_count = 0;
auto cursors = build_cursor_vec<R, PGM>(shards, &attemp_reccnt, &tombstone_count);
- m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
attemp_reccnt * sizeof(Wrapped<R>),
(byte **) &m_data);
+ std::vector<K> keys;
- auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
- m_reccnt = res.record_count;
- m_tombstone_cnt = res.tombstone_count;
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i=0; i<cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
- if (m_reccnt > 0) {
- std::vector<K> keys;
- for (size_t i=0; i<m_reccnt; i++) {
- keys.emplace_back(m_data[i].rec.key);
+ merge_info info = {0, 0};
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop(); pq.pop();
+ auto& cursor1 = cursors[now.version];
+ auto& cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
+ } else {
+ auto& cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted()) {
+ keys.emplace_back(cursor.ptr->rec.key);
+ m_data[info.record_count++] = *cursor.ptr;
+
+ /*
+ * if the record is a tombstone, increment the ts count and
+ * insert it into the bloom filter if one has been
+ * provided.
+ */
+ if (cursor.ptr->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf) {
+ m_bf->insert(cursor.ptr->rec);
+ }
+ }
+ }
+ pq.pop();
+
+ if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
}
+ }
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
+
+ if (m_reccnt > 0) {
m_pgm = pgm::PGMIndex<K, epsilon>(keys);
}
}
@@ -132,7 +222,7 @@ public:
size_t get_memory_usage() {
- return m_pgm.size_in_bytes() + m_alloc_size;
+ return m_pgm.size_in_bytes();
}
size_t get_aux_memory_usage() {
@@ -147,14 +237,16 @@ public:
return m_reccnt;
}
- // If the region to search is less than some pre-specified
- // amount, perform a linear scan to locate the record.
+ /*
+ * If the region to search is less than some pre-specified
+ * amount, perform a linear scan to locate the record.
+ */
if (bound.hi - bound.lo < 256) {
while (idx < bound.hi && m_data[idx].rec.key < key) {
idx++;
}
} else {
- // Otherwise, perform a binary search
+ /* Otherwise, perform a binary search */
idx = bound.lo;
size_t max = bound.hi;
@@ -169,10 +261,26 @@ public:
}
+ /*
+ * the upper bound returned by PGM is one passed the end of the
+ * array. If we are at that point, we should just return "not found"
+ */
+ if (idx == m_reccnt) {
+ return idx;
+ }
+
+ /*
+ * We may have walked one passed the actual lower bound, so check
+ * the index before the current one to see if it is the actual bound
+ */
if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) {
return idx-1;
}
+ /*
+ * Otherwise, check idx. If it is a valid bound, then return it,
+ * otherwise return "not found".
+ */
return (m_data[idx].rec.key >= key) ? idx : m_reccnt;
}
diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h
index 2a432e8..581277e 100644
--- a/include/shard/TrieSpline.h
+++ b/include/shard/TrieSpline.h
@@ -36,39 +36,94 @@ private:
public:
TrieSpline(BufferView<R> buffer)
- : m_data(nullptr)
- , m_reccnt(0)
+ : m_reccnt(0)
, m_tombstone_cnt(0)
, m_alloc_size(0)
, m_max_key(0)
, m_min_key(0)
- , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS))
+ , m_bf(nullptr)
{
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
buffer.get_record_count() *
sizeof(Wrapped<R>),
(byte**) &m_data);
- auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf);
- m_reccnt = res.record_count;
- m_tombstone_cnt = res.tombstone_count;
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE,
+ buffer.get_record_count(),
+ sizeof(Wrapped<R>));
+ buffer.copy_to_buffer((byte *) temp_buffer);
+
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ auto tmp_min_key = temp_buffer[0].rec.key;
+ auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key;
+ auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E);
+
+ merge_info info = {0, 0};
+
+ m_min_key = tmp_max_key;
+ m_max_key = tmp_min_key;
+
+ /*
+ * Iterate over the temporary buffer to process the records, copying
+ * them into buffer as needed
+ */
+ while (base < stop) {
+ if (!base->is_tombstone() && (base + 1 < stop)
+ && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
+ base += 2;
+ continue;
+ } else if (base->is_deleted()) {
+ base += 1;
+ continue;
+ }
- if (m_reccnt > 0) {
- m_min_key = m_data[0].rec.key;
- m_max_key = m_data[m_reccnt-1].rec.key;
+ // FIXME: this shouldn't be necessary, but the tagged record
+ // bypass doesn't seem to be working on this code-path, so this
+ // ensures that tagged records from the buffer are able to be
+ // dropped, eventually. It should only need to be &= 1
+ base->header &= 3;
+ bldr.AddKey(base->rec.key);
+ m_data[info.record_count++] = *base;
+
+ if (base->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf){
+ m_bf->insert(base->rec);
+ }
+ }
- auto bldr = ts::Builder<K>(m_min_key, m_max_key, E);
- for (size_t i=0; i<m_reccnt; i++) {
- bldr.AddKey(m_data[i].rec.key);
+ if (base->rec.key < m_min_key) {
+ m_min_key = base->rec.key;
+ }
+
+ if (base->rec.key > m_max_key) {
+ m_max_key = base->rec.key;
}
+ base++;
+ }
+
+ free(temp_buffer);
+
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
+
+ if (m_reccnt > 50) {
m_ts = bldr.Finalize();
}
}
TrieSpline(std::vector<TrieSpline*> &shards)
- : m_data(nullptr)
- , m_reccnt(0)
+ : m_reccnt(0)
, m_tombstone_cnt(0)
, m_alloc_size(0)
, m_max_key(0)
@@ -79,24 +134,90 @@ public:
size_t tombstone_count = 0;
auto cursors = build_cursor_vec<R, TrieSpline>(shards, &attemp_reccnt, &tombstone_count);
- m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
attemp_reccnt * sizeof(Wrapped<R>),
(byte **) &m_data);
- auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
- m_reccnt = res.record_count;
- m_tombstone_cnt = res.tombstone_count;
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i=0; i<cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
- if (m_reccnt > 0) {
- m_min_key = m_data[0].rec.key;
- m_max_key = m_data[m_reccnt-1].rec.key;
+ auto tmp_max_key = shards[0]->m_max_key;
+ auto tmp_min_key = shards[0]->m_min_key;
- auto bldr = ts::Builder<K>(m_min_key, m_max_key, E);
- for (size_t i=0; i<m_reccnt; i++) {
- bldr.AddKey(m_data[i].rec.key);
+ for (size_t i=0; i<shards.size(); i++) {
+ if (shards[i]->m_max_key > tmp_max_key) {
+ tmp_max_key = shards[i]->m_max_key;
+ }
+
+ if (shards[i]->m_min_key < tmp_min_key) {
+ tmp_min_key = shards[i]->m_min_key;
}
+ }
+
+ auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E);
+
+ m_max_key = tmp_min_key;
+ m_min_key = tmp_max_key;
+
+ merge_info info = {0, 0};
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop(); pq.pop();
+ auto& cursor1 = cursors[now.version];
+ auto& cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
+ } else {
+ auto& cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted()) {
+ bldr.AddKey(cursor.ptr->rec.key);
+ m_data[info.record_count++] = *cursor.ptr;
+
+ /*
+ * if the record is a tombstone, increment the ts count and
+ * insert it into the bloom filter if one has been
+ * provided.
+ */
+ if (cursor.ptr->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf) {
+ m_bf->insert(cursor.ptr->rec);
+ }
+ }
+
+ if (cursor.ptr->rec.key < m_min_key) {
+ m_min_key = cursor.ptr->rec.key;
+ }
+
+ if (cursor.ptr->rec.key > m_max_key) {
+ m_max_key = cursor.ptr->rec.key;
+ }
+ }
+ pq.pop();
+
+ if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
+ }
+ }
+
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
+ if (m_reccnt > 50) {
m_ts = bldr.Finalize();
}
}
@@ -107,7 +228,7 @@ public:
}
Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
- if (filter && !m_bf->lookup(rec)) {
+ if (filter && m_bf && !m_bf->lookup(rec)) {
return nullptr;
}
@@ -144,7 +265,7 @@ public:
size_t get_memory_usage() {
- return m_ts.GetSize() + m_alloc_size;
+ return m_ts.GetSize();
}
size_t get_aux_memory_usage() {
@@ -152,6 +273,18 @@ public:
}
size_t get_lower_bound(const K& key) const {
+ if (m_reccnt < 50) {
+ size_t bd = m_reccnt;
+ for (size_t i=0; i<m_reccnt; i++) {
+ if (m_data[i].rec.key >= key) {
+ bd = i;
+ break;
+ }
+ }
+
+ return bd;
+ }
+
auto bound = m_ts.GetSearchBound(key);
size_t idx = bound.begin;
diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h
index b342fe6..d5a2393 100644
--- a/include/shard/VPTree.h
+++ b/include/shard/VPTree.h
@@ -58,7 +58,7 @@ public:
sizeof(Wrapped<R>),
(byte**) &m_data);
- m_ptrs = new Wrapped<R>*[buffer.get_record_count()];
+ m_ptrs = new vp_ptr[buffer.get_record_count()];
size_t offset = 0;
m_reccnt = 0;
@@ -76,7 +76,7 @@ public:
rec->header &= 3;
m_data[m_reccnt] = *rec;
- m_ptrs[m_reccnt] = &m_data[m_reccnt];
+ m_ptrs[m_reccnt].ptr = &m_data[m_reccnt];
m_reccnt++;
}
@@ -97,7 +97,7 @@ public:
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
attemp_reccnt * sizeof(Wrapped<R>),
(byte **) &m_data);
- m_ptrs = new Wrapped<R>*[attemp_reccnt];
+ m_ptrs = new vp_ptr[attemp_reccnt];
// FIXME: will eventually need to figure out tombstones
// this one will likely require the multi-pass
@@ -110,7 +110,7 @@ public:
}
m_data[m_reccnt] = *shards[i]->get_record_at(j);
- m_ptrs[m_reccnt] = &m_data[m_reccnt];
+ m_ptrs[m_reccnt].ptr = &m_data[m_reccnt];
m_reccnt++;
}
}
@@ -139,8 +139,8 @@ public:
} else {
vpnode *node = m_root;
- while (!node->leaf && m_ptrs[node->start]->rec != rec) {
- if (rec.calc_distance((m_ptrs[node->start]->rec)) >= node->radius) {
+ while (!node->leaf && m_ptrs[node->start].ptr->rec != rec) {
+ if (rec.calc_distance((m_ptrs[node->start].ptr->rec)) >= node->radius) {
node = node->outside;
} else {
node = node->inside;
@@ -148,8 +148,8 @@ public:
}
for (size_t i=node->start; i<=node->stop; i++) {
- if (m_ptrs[i]->rec == rec) {
- return m_ptrs[i];
+ if (m_ptrs[i].ptr->rec == rec) {
+ return m_ptrs[i].ptr;
}
}
@@ -175,7 +175,7 @@ public:
}
size_t get_memory_usage() {
- return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*) + m_alloc_size;
+ return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*);
}
size_t get_aux_memory_usage() {
@@ -191,8 +191,12 @@ public:
}
private:
+ struct vp_ptr {
+ Wrapped<R> *ptr;
+ double dist;
+ };
Wrapped<R>* m_data;
- Wrapped<R>** m_ptrs;
+ vp_ptr* m_ptrs;
std::unordered_map<R, size_t, RecordHash<R>> m_lookup_map;
size_t m_reccnt;
size_t m_tombstone_cnt;
@@ -260,6 +264,11 @@ private:
auto i = start + gsl_rng_uniform_int(rng, stop - start + 1);
swap(start, i);
+ /* for efficiency, we'll pre-calculate the distances between each point and the root */
+ for (size_t i=start+1; i<=stop; i++) {
+ m_ptrs[i].dist = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[i].ptr->rec);
+ }
+
/*
* partition elements based on their distance from the start,
* with those elements with distance falling below the median
@@ -267,14 +276,15 @@ private:
* the median in the right. This is easily done using QuickSelect.
*/
auto mid = (start + 1 + stop) / 2;
- quickselect(start + 1, stop, mid, m_ptrs[start], rng);
+ quickselect(start + 1, stop, mid, m_ptrs[start].ptr, rng);
/* Create a new node based on this partitioning */
vpnode *node = new vpnode();
node->start = start;
/* store the radius of the circle used for partitioning the node. */
- node->radius = m_ptrs[start]->rec.calc_distance(m_ptrs[mid]->rec);
+ node->radius = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[mid].ptr->rec);
+ m_ptrs[start].dist = node->radius;
/* recursively construct the left and right subtrees */
node->inside = build_subtree(start + 1, mid-1, rng);
@@ -285,8 +295,6 @@ private:
return node;
}
- // TODO: The quickselect code can probably be generalized and moved out
- // to psudb-common instead.
void quickselect(size_t start, size_t stop, size_t k, Wrapped<R> *p, gsl_rng *rng) {
if (start == stop) return;
@@ -303,13 +311,16 @@ private:
// to psudb-common instead.
size_t partition(size_t start, size_t stop, Wrapped<R> *p, gsl_rng *rng) {
auto pivot = start + gsl_rng_uniform_int(rng, stop - start);
- double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec);
+ //double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec);
swap(pivot, stop);
size_t j = start;
for (size_t i=start; i<stop; i++) {
- if (p->rec.calc_distance(m_ptrs[i]->rec) < pivot_dist) {
+ if (m_ptrs[i].dist < m_ptrs[stop].dist) {
+ //assert(distances[i - start] == p->rec.calc_distance(m_ptrs[i]->rec));
+ //if (distances[i - start] < distances[stop - start]) {
+ //if (p->rec .calc_distance(m_ptrs[i]->rec) < pivot_dist) {
swap(j, i);
j++;
}
@@ -332,13 +343,13 @@ private:
if (node->leaf) {
for (size_t i=node->start; i<=node->stop; i++) {
- double d = point.calc_distance(m_ptrs[i]->rec);
+ double d = point.calc_distance(m_ptrs[i].ptr->rec);
if (d < *farthest) {
if (pq.size() == k) {
pq.pop();
}
- pq.push(m_ptrs[i]);
+ pq.push(m_ptrs[i].ptr);
if (pq.size() == k) {
*farthest = point.calc_distance(pq.peek().data->rec);
}
@@ -348,14 +359,14 @@ private:
return;
}
- double d = point.calc_distance(m_ptrs[node->start]->rec);
+ double d = point.calc_distance(m_ptrs[node->start].ptr->rec);
if (d < *farthest) {
if (pq.size() == k) {
auto t = pq.peek().data->rec;
pq.pop();
}
- pq.push(m_ptrs[node->start]);
+ pq.push(m_ptrs[node->start].ptr);
if (pq.size() == k) {
*farthest = point.calc_distance(pq.peek().data->rec);
}