summaryrefslogtreecommitdiffstats
path: root/include/framework/MutableBuffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/MutableBuffer.h')
-rw-r--r--include/framework/MutableBuffer.h22
1 files changed, 12 insertions, 10 deletions
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h
index 3643a89..c154001 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/MutableBuffer.h
@@ -28,14 +28,14 @@ namespace de {
template <RecordInterface R>
class MutableBuffer {
- typedef WrappedRecord<R> WRec;
+ //typedef WrappedRecord<R> R;
public:
MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap)
: m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
, m_tombstonecnt(0), m_weight(0), m_max_weight(0) {
- auto len = capacity * sizeof(WRec);
+ auto len = capacity * sizeof(R);
size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE));
- m_data = (WRec*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
+ m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
m_tombstone_filter = nullptr;
if (max_tombstone_cap > 0) {
m_tombstone_filter = new BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS);
@@ -54,12 +54,10 @@ public:
int32_t pos = 0;
if ((pos = try_advance_tail()) == -1) return 0;
- WRec wrec;
- wrec.rec = rec;
+ R new_rec = rec;
+ if (tombstone) new_rec.set_tombstone();
- if (tombstone) wrec.set_tombstone();
-
- m_data[pos] = wrec;
+ m_data[pos] = new_rec;
m_data[pos].header |= (pos << 2);
if (tombstone) {
@@ -68,7 +66,7 @@ public:
}
if constexpr (WeightedRecordInterface<R_>) {
- m_weight.fetch_add(rec.weight);
+ m_weight.fetch_add(new_rec.weight);
double old = m_max_weight.load();
while (old < rec.weight) {
m_max_weight.compare_exchange_strong(old, rec.weight);
@@ -125,7 +123,7 @@ public:
auto offset = 0;
while (offset < m_reccnt.load()) {
- if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) {
+ if (m_data[offset] == rec && m_data[offset].is_tombstone()) {
return true;
}
offset++;;
@@ -149,6 +147,10 @@ public:
return m_weight.load();
}
+ R *get_data() {
+ return m_data;
+ }
+
private:
int32_t try_advance_tail() {
size_t new_tail = m_reccnt.fetch_add(1);