summaryrefslogtreecommitdiffstats
path: root/include/framework/MutableBuffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/MutableBuffer.h')
-rw-r--r--include/framework/MutableBuffer.h109
1 files changed, 40 insertions, 69 deletions
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h
index 42bc9a7..74838b8 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/MutableBuffer.h
@@ -26,15 +26,15 @@
namespace de {
-template <typename K, typename V, typename W=void>
+template <RecordInterface R>
class MutableBuffer {
public:
MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap, const gsl_rng* rng)
: m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
, m_tombstonecnt(0), m_weight(0), m_max_weight(0) {
- auto len = capacity * sizeof(Record<K, V, W>);
+ auto len = capacity * sizeof(R);
size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE));
- m_data = (Record<K, V, W>*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
+ m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
m_tombstone_filter = nullptr;
if (max_tombstone_cap > 0) {
assert(rng != nullptr);
@@ -47,68 +47,35 @@ public:
if (m_tombstone_filter) delete m_tombstone_filter;
}
- template <typename W_=W,
- typename =std::enable_if_t<std::is_same<W_, void>::value>>
- int append(const K& key, const V& value, bool is_tombstone = false) {
- static_assert(std::is_same<W_, void>::value);
- if (is_tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0;
+ template <typename R_ = R>
+ int append(const R &rec) {
+ if (rec.is_tombstone() && m_tombstonecnt + 1 > m_tombstone_cap) return 0;
int32_t pos = 0;
if ((pos = try_advance_tail()) == -1) return 0;
- m_data[pos].key = key;
- m_data[pos].value = value;
- m_data[pos].header = ((pos << 2) | (is_tombstone ? 1 : 0));
+ m_data[pos] = rec;
+ m_data[pos].header |= (pos << 2);
- if (is_tombstone) {
+ if (rec.is_tombstone()) {
m_tombstonecnt.fetch_add(1);
- if (m_tombstone_filter) m_tombstone_filter->insert(key);
+ if (m_tombstone_filter) m_tombstone_filter->insert(rec.key);
}
- m_weight.fetch_add(1);
- return 1;
- }
-
- template <typename W_=W,
- typename = std::enable_if_t<!std::is_same<W_, void>::value>>
- int append(const K& key, const V& value, W_ weight=1, bool is_tombstone = false) {
- static_assert(!std::is_same<W_, void>::value);
- if (is_tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0;
-
- int32_t pos = 0;
- if ((pos = try_advance_tail()) == -1) return 0;
-
- if (is_tombstone) {
- weight = 0;
- }
-
- m_data[pos].key = key;
- m_data[pos].value = value;
- m_data[pos].header = ((pos << 2) | (is_tombstone ? 1 : 0));
- m_data[pos].weight = weight;
-
- if (is_tombstone) {
- m_tombstonecnt.fetch_add(1);
- if (m_tombstone_filter) m_tombstone_filter->insert(key);
- }
-
- double old_val, new_val;
- do {
- old_val = m_weight.load();
- new_val = old_val + weight;
- } while (!m_weight.compare_exchange_strong(old_val, new_val));
-
-
- double old = m_max_weight.load();
- while (old < weight) {
- m_max_weight.compare_exchange_strong(old, weight);
- old = m_max_weight.load();
+ if constexpr (WeightedRecordInterface<R_>) {
+ m_weight.fetch_add(rec.weight);
+ double old = m_max_weight.load();
+ while (old < rec.weight) {
+ m_max_weight.compare_exchange_strong(old, rec.weight);
+ old = m_max_weight.load();
+ }
+ } else {
+ m_weight.fetch_add(1);
}
return 1;
}
-
bool truncate() {
m_tombstonecnt.store(0);
m_reccnt.store(0);
@@ -119,10 +86,10 @@ public:
return true;
}
- Record<K, V, W>* sorted_output() {
+ R* sorted_output() {
TIMER_INIT();
TIMER_START();
- std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp<K,V,W>);
+ std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp<R>);
TIMER_STOP();
#ifdef INSTRUMENT_MERGING
@@ -147,11 +114,11 @@ public:
return m_tombstonecnt.load();
}
- bool delete_record(const K& key, const V& val) {
+ bool delete_record(const R& rec) {
auto offset = 0;
while (offset < m_reccnt.load()) {
- if (m_data[offset].match(key, val, false)) {
- m_data[offset].set_delete_status();
+ if (m_data[offset] == rec) {
+ m_data[offset].set_delete();
return true;
}
offset++;
@@ -160,23 +127,25 @@ public:
return false;
}
- bool check_tombstone(const K& key, const V& value) {
- if (m_tombstone_filter && !m_tombstone_filter->lookup(key)) return false;
+ bool check_tombstone(const R& rec) {
+ if (m_tombstone_filter && !m_tombstone_filter->lookup(rec.key)) return false;
auto offset = 0;
while (offset < m_reccnt.load()) {
- if (m_data[offset].match(key, value, true)) return true;
+ if (m_data[offset] == rec && m_data[offset].is_tombstone()) {
+ return true;
+ }
offset++;;
}
return false;
}
- const Record<K, V, W>* get_record_at(size_t idx) {
+ const R* get_record_at(size_t idx) {
return m_data + idx;
}
size_t get_memory_utilization() {
- return m_cap * sizeof(Record<K, V, W>);
+ return m_cap * sizeof(R);
}
size_t get_aux_memory_utilization() {
@@ -185,17 +154,18 @@ public:
//
// NOTE: This operation samples from records strictly between the upper and
// lower bounds, not including them
- double get_sample_range(const K& lower, const K& upper, std::vector<Record<K, V, W> *> &records,
- Alias **alias, size_t *cutoff) {
+ template <typename R_ = R>
+ double get_sample_range(const decltype(R_::key) lower, const decltype(R_::key)& upper,
+ std::vector<R *> &records, Alias **alias, size_t *cutoff) {
std::vector<double> weights;
*cutoff = std::atomic_load(&m_reccnt) - 1;
records.clear();
double tot_weight = 0.0;
for (size_t i = 0; i < (*cutoff) + 1; i++) {
- Record<K, V, W> *rec = m_data + i;
+ R *rec = m_data + i;
- if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->get_delete_status()) {
+ if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->is_deleted()) {
weights.push_back(rec->weight);
records.push_back(rec);
tot_weight += rec->weight;
@@ -212,7 +182,8 @@ public:
}
// rejection sampling
- const Record<K, V, W> *get_sample(const K& lower, const K& upper, gsl_rng *rng) {
+ template <typename R_ = R>
+ const R *get_sample(const decltype(R_::key)& lower, const decltype(R_::key)& upper, gsl_rng *rng) {
size_t reccnt = m_reccnt.load();
if (reccnt == 0) {
return nullptr;
@@ -230,7 +201,7 @@ public:
if (test <= rec->weight &&
rec->key >= lower &&
rec->key <= upper &&
- !rec->is_tombstone() && !rec->get_delete_status()) {
+ !rec->is_tombstone() && !rec->is_deleted()) {
return rec;
}
@@ -259,7 +230,7 @@ private:
size_t m_tombstone_cap;
//char* m_data;
- Record<K, V, W>* m_data;
+ R* m_data;
BloomFilter* m_tombstone_filter;
alignas(64) std::atomic<size_t> m_tombstonecnt;