summaryrefslogtreecommitdiffstats
path: root/include/framework/MutableBuffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/MutableBuffer.h')
-rw-r--r--include/framework/MutableBuffer.h115
1 files changed, 22 insertions, 93 deletions
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h
index 74838b8..4b45f20 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/MutableBuffer.h
@@ -28,17 +28,17 @@ namespace de {
template <RecordInterface R>
class MutableBuffer {
+ typedef WrappedRecord<R> WRec;
public:
- MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap, const gsl_rng* rng)
+ MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap)
: m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
, m_tombstonecnt(0), m_weight(0), m_max_weight(0) {
- auto len = capacity * sizeof(R);
+ auto len = capacity * sizeof(WRec);
size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE));
- m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
+ m_data = (WRec*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
m_tombstone_filter = nullptr;
if (max_tombstone_cap > 0) {
- assert(rng != nullptr);
- m_tombstone_filter = new BloomFilter(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS, rng);
+ m_tombstone_filter = new BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS);
}
}
@@ -48,18 +48,23 @@ public:
}
template <typename R_ = R>
- int append(const R &rec) {
- if (rec.is_tombstone() && m_tombstonecnt + 1 > m_tombstone_cap) return 0;
+ int append(const R &rec, bool tombstone=false) {
+ if (tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0;
int32_t pos = 0;
if ((pos = try_advance_tail()) == -1) return 0;
- m_data[pos] = rec;
+ WRec wrec;
+ wrec.rec = rec;
+
+ if (tombstone) wrec.set_tombstone();
+
+ m_data[pos] = wrec;
m_data[pos].header |= (pos << 2);
- if (rec.is_tombstone()) {
+ if (tombstone) {
m_tombstonecnt.fetch_add(1);
- if (m_tombstone_filter) m_tombstone_filter->insert(rec.key);
+ if (m_tombstone_filter) m_tombstone_filter->insert(rec);
}
if constexpr (WeightedRecordInterface<R_>) {
@@ -86,18 +91,6 @@ public:
return true;
}
- R* sorted_output() {
- TIMER_INIT();
- TIMER_START();
- std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp<R>);
- TIMER_STOP();
-
- #ifdef INSTRUMENT_MERGING
- fprintf(stderr, "sort\t%ld\n", TIMER_RESULT());
- #endif
- return m_data;
- }
-
size_t get_record_count() {
return m_reccnt;
}
@@ -117,7 +110,7 @@ public:
bool delete_record(const R& rec) {
auto offset = 0;
while (offset < m_reccnt.load()) {
- if (m_data[offset] == rec) {
+ if (m_data[offset].rec == rec) {
m_data[offset].set_delete();
return true;
}
@@ -128,11 +121,11 @@ public:
}
bool check_tombstone(const R& rec) {
- if (m_tombstone_filter && !m_tombstone_filter->lookup(rec.key)) return false;
+ if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false;
auto offset = 0;
while (offset < m_reccnt.load()) {
- if (m_data[offset] == rec && m_data[offset].is_tombstone()) {
+ if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) {
return true;
}
offset++;;
@@ -140,73 +133,12 @@ public:
return false;
}
- const R* get_record_at(size_t idx) {
- return m_data + idx;
- }
-
- size_t get_memory_utilization() {
+ size_t get_memory_usage() {
return m_cap * sizeof(R);
}
- size_t get_aux_memory_utilization() {
- return m_tombstone_filter->get_memory_utilization();
- }
- //
- // NOTE: This operation samples from records strictly between the upper and
- // lower bounds, not including them
- template <typename R_ = R>
- double get_sample_range(const decltype(R_::key) lower, const decltype(R_::key)& upper,
- std::vector<R *> &records, Alias **alias, size_t *cutoff) {
- std::vector<double> weights;
-
- *cutoff = std::atomic_load(&m_reccnt) - 1;
- records.clear();
- double tot_weight = 0.0;
- for (size_t i = 0; i < (*cutoff) + 1; i++) {
- R *rec = m_data + i;
-
- if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->is_deleted()) {
- weights.push_back(rec->weight);
- records.push_back(rec);
- tot_weight += rec->weight;
- }
- }
-
- for (size_t i = 0; i < weights.size(); i++) {
- weights[i] = weights[i] / tot_weight;
- }
-
- *alias = new Alias(weights);
-
- return tot_weight;
- }
-
- // rejection sampling
- template <typename R_ = R>
- const R *get_sample(const decltype(R_::key)& lower, const decltype(R_::key)& upper, gsl_rng *rng) {
- size_t reccnt = m_reccnt.load();
- if (reccnt == 0) {
- return nullptr;
- }
-
- auto idx = (reccnt == 1) ? 0 : gsl_rng_uniform_int(rng, reccnt - 1);
- auto rec = get_record_at(idx);
-
- auto test = gsl_rng_uniform(rng) * m_max_weight.load();
-
- if (test > rec->weight) {
- return nullptr;
- }
-
- if (test <= rec->weight &&
- rec->key >= lower &&
- rec->key <= upper &&
- !rec->is_tombstone() && !rec->is_deleted()) {
-
- return rec;
- }
-
- return nullptr;
+ size_t get_aux_memory_usage() {
+ return m_tombstone_filter->get_memory_usage();
}
size_t get_tombstone_capacity() {
@@ -226,15 +158,12 @@ private:
}
size_t m_cap;
- //size_t m_buffersize;
size_t m_tombstone_cap;
- //char* m_data;
R* m_data;
- BloomFilter* m_tombstone_filter;
+ BloomFilter<R>* m_tombstone_filter;
alignas(64) std::atomic<size_t> m_tombstonecnt;
- //alignas(64) std::atomic<uint32_t> m_current_tail;
alignas(64) std::atomic<uint32_t> m_reccnt;
alignas(64) std::atomic<double> m_weight;
alignas(64) std::atomic<double> m_max_weight;