summaryrefslogtreecommitdiffstats
path: root/include/shard/Alias.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-02-07 17:23:23 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-02-07 17:24:50 -0500
commitbd74e27b28bd95267ce50d2e4b6f12b51d9b6aae (patch)
tree8e40038feaa9c83c4da967ab78564c51fc67a653 /include/shard/Alias.h
parent2c5d549b3618b9ea72e6eece4cb4f3da5a6811a8 (diff)
downloaddynamic-extension-bd74e27b28bd95267ce50d2e4b6f12b51d9b6aae.tar.gz
Cleaned up shard files (except VPTree)
Cleaned up shard implementations, fixed a few bugs, and set up some tests. There's still some work to be done in creating tests for the weighted sampling operations for the alias and aug btree shards.
Diffstat (limited to 'include/shard/Alias.h')
-rw-r--r--include/shard/Alias.h155
1 files changed, 50 insertions, 105 deletions
diff --git a/include/shard/Alias.h b/include/shard/Alias.h
index a234575..f0d1d59 100644
--- a/include/shard/Alias.h
+++ b/include/shard/Alias.h
@@ -14,20 +14,19 @@
#pragma once
#include <vector>
-#include <cassert>
#include "framework/ShardRequirements.h"
-#include "psu-ds/PriorityQueue.h"
-#include "util/Cursor.h"
#include "psu-ds/Alias.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
+#include "util/SortedMerge.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
using psudb::PriorityQueue;
using psudb::queue_record;
+using psudb::byte;
namespace de {
@@ -41,126 +40,73 @@ private:
typedef decltype(R::weight) W;
public:
- Alias(BufferView<R>* buffer)
- : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_alias(nullptr), m_bf(nullptr) {
-
- m_alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
-
- m_bf = new BloomFilter<R>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS);
-
- size_t offset = 0;
- m_reccnt = 0;
- auto base = buffer->get_data();
- auto stop = base + buffer->get_record_count();
-
- std::sort(base, stop, std::less<Wrapped<R>>());
-
- std::vector<W> weights;
-
- while (base < stop) {
- if (!(base->is_tombstone()) && (base + 1) < stop) {
- if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
- base += 2;
- wss_cancelations++;
- continue;
- }
- } else if (base->is_deleted()) {
- base += 1;
- continue;
- }
+ Alias(BufferView<R> buffer)
+ : m_data(nullptr)
+ , m_alias(nullptr)
+ , m_total_weight(0)
+ , m_reccnt(0)
+ , m_tombstone_cnt(0)
+ , m_alloc_size(0)
+ , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) {
+
+
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ buffer.get_record_count() *
+ sizeof(Wrapped<R>),
+ (byte**) &m_data);
+
+ auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
- // FIXME: this shouldn't be necessary, but the tagged record
- // bypass doesn't seem to be working on this code-path, so this
- // ensures that tagged records from the buffer are able to be
- // dropped, eventually. It should only need to be &= 1
- base->header &= 3;
- m_data[m_reccnt++] = *base;
- m_total_weight+= base->rec.weight;
- weights.push_back(base->rec.weight);
-
- if (m_bf && base->is_tombstone()) {
- m_tombstone_cnt++;
- m_bf->insert(base->rec);
+ if (m_reccnt > 0) {
+ std::vector<W> weights;
+ for (size_t i=0; i<m_reccnt; i++) {
+ weights.emplace_back(m_data[i].rec.weight);
+ m_total_weight += m_data[i].rec.weight;
}
-
- base++;
- }
- if (m_reccnt > 0) {
build_alias_structure(weights);
}
}
Alias(std::vector<Alias*> &shards)
- : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_alias(nullptr), m_bf(nullptr) {
- std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(shards.size());
-
- PriorityQueue<Wrapped<R>> pq(shards.size());
+ : m_data(nullptr)
+ , m_alias(nullptr)
+ , m_total_weight(0)
+ , m_reccnt(0)
+ , m_tombstone_cnt(0)
+ , m_alloc_size(0)
+ , m_bf(nullptr) {
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
-
- for (size_t i = 0; i < shards.size(); ++i) {
- if (shards[i]) {
- auto base = shards[i]->get_data();
- cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
- attemp_reccnt += shards[i]->get_record_count();
- tombstone_count += shards[i]->get_tombstone_count();
- pq.push(cursors[i].ptr, i);
- } else {
- cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
- }
- }
+ auto cursors = build_cursor_vec<R, Alias>(shards, &attemp_reccnt, &tombstone_count);
m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ attemp_reccnt * sizeof(Wrapped<R>),
+ (byte **) &m_data);
- m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
-
- std::vector<W> weights;
-
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- if (!cursor.ptr->is_deleted()) {
- m_data[m_reccnt++] = *cursor.ptr;
- m_total_weight += cursor.ptr->rec.weight;
- weights.push_back(cursor.ptr->rec.weight);
- if (m_bf && cursor.ptr->is_tombstone()) {
- ++m_tombstone_cnt;
- if (m_bf) m_bf->insert(cursor.ptr->rec);
- }
- }
- pq.pop();
-
- if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
if (m_reccnt > 0) {
+ std::vector<W> weights;
+ for (size_t i=0; i<m_reccnt; i++) {
+ weights.emplace_back(m_data[i].rec.weight);
+ m_total_weight += m_data[i].rec.weight;
+ }
+
build_alias_structure(weights);
}
}
~Alias() {
- if (m_data) free(m_data);
- if (m_alias) delete m_alias;
- if (m_bf) delete m_bf;
-
+ free(m_data);
+ delete m_alias;
+ delete m_bf;
}
Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
@@ -173,7 +119,7 @@ public:
return nullptr;
}
- while (idx < m_reccnt && m_data[idx].rec < rec) ++idx;
+ while (idx < (m_reccnt-1) && m_data[idx].rec < rec) ++idx;
if (m_data[idx].rec == rec) {
return m_data + idx;
@@ -205,7 +151,7 @@ public:
}
size_t get_aux_memory_usage() {
- return 0;
+ return (m_bf) ? m_bf->memory_usage() : 0;
}
W get_total_weight() {
@@ -254,7 +200,6 @@ private:
W m_total_weight;
size_t m_reccnt;
size_t m_tombstone_cnt;
- size_t m_group_size;
size_t m_alloc_size;
BloomFilter<R> *m_bf;
};