summaryrefslogtreecommitdiffstats
path: root/include/shard/PGM.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-02-07 17:23:23 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-02-07 17:24:50 -0500
commitbd74e27b28bd95267ce50d2e4b6f12b51d9b6aae (patch)
tree8e40038feaa9c83c4da967ab78564c51fc67a653 /include/shard/PGM.h
parent2c5d549b3618b9ea72e6eece4cb4f3da5a6811a8 (diff)
downloaddynamic-extension-bd74e27b28bd95267ce50d2e4b6f12b51d9b6aae.tar.gz
Cleaned up shard files (except VPTree)
Cleaned up shard implementations, fixed a few bugs, and set up some tests. There's still some work to be done in creating tests for the weighted sampling operations for the alias and aug btree shards.
Diffstat (limited to 'include/shard/PGM.h')
-rw-r--r--include/shard/PGM.h140
1 files changed, 45 insertions, 95 deletions
diff --git a/include/shard/PGM.h b/include/shard/PGM.h
index 13db26a..8031870 100644
--- a/include/shard/PGM.h
+++ b/include/shard/PGM.h
@@ -14,24 +14,19 @@
#include <vector>
-#include <cassert>
-#include <queue>
-#include <memory>
-#include <concepts>
#include "framework/ShardRequirements.h"
#include "pgm/pgm_index.hpp"
-#include "psu-ds/PriorityQueue.h"
-#include "util/Cursor.h"
#include "psu-ds/BloomFilter.h"
+#include "util/SortedMerge.h"
#include "util/bf_config.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
using psudb::PriorityQueue;
using psudb::queue_record;
-using psudb::Alias;
+using psudb::byte;
namespace de {
@@ -41,111 +36,65 @@ private:
typedef decltype(R::key) K;
typedef decltype(R::value) V;
-
public:
- PGM(MutableBuffer<R>* buffer)
- : m_reccnt(0), m_tombstone_cnt(0) {
-
- m_alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
- std::vector<K> keys;
-
- size_t offset = 0;
- m_reccnt = 0;
- auto base = buffer->get_data();
- auto stop = base + buffer->get_record_count();
+ PGM(BufferView<R> buffer)
+ : m_data(nullptr)
+ , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS))
+ , m_reccnt(0)
+ , m_tombstone_cnt(0)
+ , m_alloc_size(0) {
+
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ buffer.get_record_count() *
+ sizeof(Wrapped<R>),
+ (byte**) &m_data);
+ auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
- std::sort(base, stop, std::less<Wrapped<R>>());
-
- K min_key = base->rec.key;
- K max_key = (stop - 1)->rec.key;
-
- while (base < stop) {
- if (!(base->is_tombstone()) && (base + 1) < stop) {
- if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
- base += 2;
- continue;
- }
- } else if (base->is_deleted()) {
- base += 1;
- continue;
+ if (m_reccnt > 0) {
+ std::vector<K> keys;
+ for (size_t i=0; i<m_reccnt; i++) {
+ keys.emplace_back(m_data[i].rec.key);
}
- // FIXME: this shouldn't be necessary, but the tagged record
- // bypass doesn't seem to be working on this code-path, so this
- // ensures that tagged records from the buffer are able to be
- // dropped, eventually. It should only need to be &= 1
- base->header &= 3;
- m_data[m_reccnt++] = *base;
- keys.emplace_back(base->rec.key);
- base++;
- }
-
- if (m_reccnt > 0) {
m_pgm = pgm::PGMIndex<K, epsilon>(keys);
}
}
- PGM(PGM** shards, size_t len)
- : m_reccnt(0), m_tombstone_cnt(0) {
- std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(len);
-
- PriorityQueue<Wrapped<R>> pq(len);
-
+ PGM(std::vector<PGM*> shards)
+ : m_data(nullptr)
+ , m_bf(nullptr)
+ , m_reccnt(0)
+ , m_tombstone_cnt(0)
+ , m_alloc_size(0) {
+
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
-
- for (size_t i = 0; i < len; ++i) {
- if (shards[i]) {
- auto base = shards[i]->get_data();
- cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
- attemp_reccnt += shards[i]->get_record_count();
- tombstone_count += shards[i]->get_tombstone_count();
- pq.push(cursors[i].ptr, i);
-
- } else {
- cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
- }
- }
+ auto cursors = build_cursor_vec<R, PGM>(shards, &attemp_reccnt, &tombstone_count);
- m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
-
- std::vector<K> keys;
-
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- if (!cursor.ptr->is_deleted()) {
- m_data[m_reccnt++] = *cursor.ptr;
- keys.emplace_back(cursor.ptr->rec.key);
- }
- pq.pop();
-
- if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ attemp_reccnt * sizeof(Wrapped<R>),
+ (byte **) &m_data);
+
+ auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
if (m_reccnt > 0) {
+ std::vector<K> keys;
+ for (size_t i=0; i<m_reccnt; i++) {
+ keys.emplace_back(m_data[i].rec.key);
+ }
+
m_pgm = pgm::PGMIndex<K, epsilon>(keys);
}
}
~PGM() {
- if (m_data) free(m_data);
+ free(m_data);
+ delete m_bf;
}
Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
@@ -186,7 +135,7 @@ public:
}
size_t get_aux_memory_usage() {
- return 0;
+ return (m_bf) ? m_bf->memory_usage() : 0;
}
size_t get_lower_bound(const K& key) const {
@@ -228,6 +177,7 @@ public:
private:
Wrapped<R>* m_data;
+ BloomFilter<R> *m_bf;
size_t m_reccnt;
size_t m_tombstone_cnt;
size_t m_alloc_size;