summaryrefslogtreecommitdiffstats
path: root/include/shard/AugBTree.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-02-07 17:23:23 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-02-07 17:24:50 -0500
commitbd74e27b28bd95267ce50d2e4b6f12b51d9b6aae (patch)
tree8e40038feaa9c83c4da967ab78564c51fc67a653 /include/shard/AugBTree.h
parent2c5d549b3618b9ea72e6eece4cb4f3da5a6811a8 (diff)
downloaddynamic-extension-bd74e27b28bd95267ce50d2e4b6f12b51d9b6aae.tar.gz
Cleaned up shard files (except VPTree)
Cleaned up shard implementations, fixed a few bugs, and set up some tests. There's still some work to be done in creating tests for the weighted sampling operations for the alias and aug btree shards.
Diffstat (limited to 'include/shard/AugBTree.h')
-rw-r--r--include/shard/AugBTree.h150
1 files changed, 42 insertions, 108 deletions
diff --git a/include/shard/AugBTree.h b/include/shard/AugBTree.h
index be664ac..58bd098 100644
--- a/include/shard/AugBTree.h
+++ b/include/shard/AugBTree.h
@@ -16,28 +16,21 @@
#include <vector>
#include <cassert>
-#include <queue>
-#include <memory>
-#include <concepts>
#include "framework/ShardRequirements.h"
-#include "psu-ds/PriorityQueue.h"
-#include "util/Cursor.h"
#include "psu-ds/Alias.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
+#include "util/SortedMerge.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
-using psudb::PriorityQueue;
-using psudb::queue_record;
using psudb::Alias;
+using psudb::byte;
namespace de {
-thread_local size_t wirs_cancelations = 0;
-
template <WeightedRecordInterface R>
struct AugBTreeNode {
struct AugBTreeNode<R> *left, *right;
@@ -54,108 +47,52 @@ private:
typedef decltype(R::weight) W;
public:
- AugBTree(MutableBuffer<R>* buffer)
- : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) {
- m_alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
-
- m_bf = new BloomFilter<R>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS);
-
- size_t offset = 0;
- m_reccnt = 0;
- auto base = buffer->get_data();
- auto stop = base + buffer->get_record_count();
-
- std::sort(base, stop, std::less<Wrapped<R>>());
-
- while (base < stop) {
- if (!(base->is_tombstone()) && (base + 1) < stop) {
- if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
- base += 2;
- wirs_cancelations++;
- continue;
- }
- } else if (base->is_deleted()) {
- base += 1;
- continue;
- }
-
- // FIXME: this shouldn't be necessary, but the tagged record
- // bypass doesn't seem to be working on this code-path, so this
- // ensures that tagged records from the buffer are able to be
- // dropped, eventually. It should only need to be &= 1
- base->header &= 3;
- m_data[m_reccnt++] = *base;
- m_total_weight+= base->rec.weight;
-
- if (m_bf && base->is_tombstone()) {
- m_tombstone_cnt++;
- m_bf->insert(base->rec);
- }
-
- base++;
- }
+ AugBTree(BufferView<R> buffer)
+ : m_data(nullptr)
+ , m_root(nullptr)
+ , m_reccnt(0)
+ , m_tombstone_cnt(0)
+ , m_group_size(0)
+ , m_alloc_size(0)
+ , m_node_cnt(0)
+ , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS))
+ {
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ buffer.get_record_count() *
+ sizeof(Wrapped<R>),
+ (byte**) &m_data);
+
+ auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
if (m_reccnt > 0) {
build_wirs_structure();
}
}
- AugBTree(AugBTree** shards, size_t len)
- : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) {
- std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(len);
-
- PriorityQueue<Wrapped<R>> pq(len);
-
+ AugBTree(std::vector<AugBTree*> shards)
+ : m_data(nullptr)
+ , m_root(nullptr)
+ , m_reccnt(0)
+ , m_tombstone_cnt(0)
+ , m_group_size(0)
+ , m_alloc_size(0)
+ , m_node_cnt(0)
+ , m_bf(nullptr)
+ {
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
-
- for (size_t i = 0; i < len; ++i) {
- if (shards[i]) {
- auto base = shards[i]->get_data();
- cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
- attemp_reccnt += shards[i]->get_record_count();
- tombstone_count += shards[i]->get_tombstone_count();
- pq.push(cursors[i].ptr, i);
- } else {
- cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
- }
- }
+ auto cursors = build_cursor_vec<R, AugBTree>(shards, &attemp_reccnt, &tombstone_count);
m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ attemp_reccnt * sizeof(Wrapped<R>),
+ (byte **) &m_data);
- m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
-
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- if (!cursor.ptr->is_deleted()) {
- m_data[m_reccnt++] = *cursor.ptr;
- m_total_weight += cursor.ptr->rec.weight;
- if (m_bf && cursor.ptr->is_tombstone()) {
- ++m_tombstone_cnt;
- if (m_bf) m_bf->insert(cursor.ptr->rec);
- }
- }
- pq.pop();
-
- if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
if (m_reccnt > 0) {
build_wirs_structure();
@@ -163,13 +100,12 @@ public:
}
~AugBTree() {
- if (m_data) free(m_data);
+ free(m_data);
for (size_t i=0; i<m_alias.size(); i++) {
- if (m_alias[i]) delete m_alias[i];
+ delete m_alias[i];
}
- if (m_bf) delete m_bf;
-
+ delete m_bf;
free_tree(m_root);
}
@@ -183,7 +119,7 @@ public:
return nullptr;
}
- while (idx < m_reccnt && m_data[idx].rec < rec) ++idx;
+ while (idx < (m_reccnt-1) && m_data[idx].rec < rec) ++idx;
if (m_data[idx].rec == rec) {
return m_data + idx;
@@ -209,13 +145,12 @@ public:
return m_data + idx;
}
-
size_t get_memory_usage() {
return m_alloc_size + m_node_cnt * sizeof(AugBTreeNode<Wrapped<R>>);
}
size_t get_aux_memory_usage() {
- return 0;
+ return (m_bf) ? m_bf->memory_usage() : 0;
}
size_t get_lower_bound(const K& key) const {
@@ -364,7 +299,6 @@ private:
Wrapped<R>* m_data;
std::vector<Alias *> m_alias;
AugBTreeNode<R>* m_root;
- W m_total_weight;
size_t m_reccnt;
size_t m_tombstone_cnt;
size_t m_group_size;