diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-22 13:25:20 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-22 13:25:20 -0400 |
| commit | 08d6c84b9d69b500c964a8ff66e726e1f01f2095 (patch) | |
| tree | 8bc156b54383de8a4a347e463901dcb7bdd1da10 /include/framework/MutableBuffer.h | |
| parent | 6fd50506d2e50d2faf2478a2883a2ef1b4840a78 (diff) | |
| download | dynamic-extension-08d6c84b9d69b500c964a8ff66e726e1f01f2095.tar.gz | |
Progress towards generalization of shard interface
Diffstat (limited to 'include/framework/MutableBuffer.h')
| -rw-r--r-- | include/framework/MutableBuffer.h | 115 |
1 files changed, 22 insertions, 93 deletions
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index 74838b8..4b45f20 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -28,17 +28,17 @@ namespace de { template <RecordInterface R> class MutableBuffer { + typedef WrappedRecord<R> WRec; public: - MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap, const gsl_rng* rng) + MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap) : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { - auto len = capacity * sizeof(R); + auto len = capacity * sizeof(WRec); size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE)); - m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); + m_data = (WRec*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); m_tombstone_filter = nullptr; if (max_tombstone_cap > 0) { - assert(rng != nullptr); - m_tombstone_filter = new BloomFilter(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS, rng); + m_tombstone_filter = new BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS); } } @@ -48,18 +48,23 @@ public: } template <typename R_ = R> - int append(const R &rec) { - if (rec.is_tombstone() && m_tombstonecnt + 1 > m_tombstone_cap) return 0; + int append(const R &rec, bool tombstone=false) { + if (tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; int32_t pos = 0; if ((pos = try_advance_tail()) == -1) return 0; - m_data[pos] = rec; + WRec wrec; + wrec.rec = rec; + + if (tombstone) wrec.set_tombstone(); + + m_data[pos] = wrec; m_data[pos].header |= (pos << 2); - if (rec.is_tombstone()) { + if (tombstone) { m_tombstonecnt.fetch_add(1); - if (m_tombstone_filter) m_tombstone_filter->insert(rec.key); + if (m_tombstone_filter) m_tombstone_filter->insert(rec); } if constexpr (WeightedRecordInterface<R_>) { @@ -86,18 +91,6 @@ public: return true; } - R* sorted_output() { - TIMER_INIT(); - TIMER_START(); - std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp<R>); - TIMER_STOP(); - - #ifdef INSTRUMENT_MERGING - fprintf(stderr, "sort\t%ld\n", TIMER_RESULT()); - #endif - return m_data; - } - size_t get_record_count() { return m_reccnt; } @@ -117,7 +110,7 @@ public: bool delete_record(const R& rec) { auto offset = 0; while (offset < m_reccnt.load()) { - if (m_data[offset] == rec) { + if (m_data[offset].rec == rec) { m_data[offset].set_delete(); return true; } @@ -128,11 +121,11 @@ public: } bool check_tombstone(const R& rec) { - if (m_tombstone_filter && !m_tombstone_filter->lookup(rec.key)) return false; + if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; auto offset = 0; while (offset < m_reccnt.load()) { - if (m_data[offset] == rec && m_data[offset].is_tombstone()) { + if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) { return true; } offset++;; @@ -140,73 +133,12 @@ public: return false; } - const R* get_record_at(size_t idx) { - return m_data + idx; - } - - size_t get_memory_utilization() { + size_t get_memory_usage() { return m_cap * sizeof(R); } - size_t get_aux_memory_utilization() { - return m_tombstone_filter->get_memory_utilization(); - } - // - // NOTE: This operation samples from records strictly between the upper and - // lower bounds, not including them - template <typename R_ = R> - double get_sample_range(const decltype(R_::key) lower, const decltype(R_::key)& upper, - std::vector<R *> &records, Alias **alias, size_t *cutoff) { - std::vector<double> weights; - - *cutoff = std::atomic_load(&m_reccnt) - 1; - records.clear(); - double tot_weight = 0.0; - for (size_t i = 0; i < (*cutoff) + 1; i++) { - R *rec = m_data + i; - - if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->is_deleted()) { - weights.push_back(rec->weight); - records.push_back(rec); - tot_weight += rec->weight; - } - } - - for (size_t i = 0; i < weights.size(); i++) { - weights[i] = weights[i] / tot_weight; - } - - *alias = new Alias(weights); - - return tot_weight; - } - - // rejection sampling - template <typename R_ = R> - const R *get_sample(const decltype(R_::key)& lower, const decltype(R_::key)& upper, gsl_rng *rng) { - size_t reccnt = m_reccnt.load(); - if (reccnt == 0) { - return nullptr; - } - - auto idx = (reccnt == 1) ? 0 : gsl_rng_uniform_int(rng, reccnt - 1); - auto rec = get_record_at(idx); - - auto test = gsl_rng_uniform(rng) * m_max_weight.load(); - - if (test > rec->weight) { - return nullptr; - } - - if (test <= rec->weight && - rec->key >= lower && - rec->key <= upper && - !rec->is_tombstone() && !rec->is_deleted()) { - - return rec; - } - - return nullptr; + size_t get_aux_memory_usage() { + return m_tombstone_filter->get_memory_usage(); } size_t get_tombstone_capacity() { @@ -226,15 +158,12 @@ private: } size_t m_cap; - //size_t m_buffersize; size_t m_tombstone_cap; - //char* m_data; R* m_data; - BloomFilter* m_tombstone_filter; + BloomFilter<R>* m_tombstone_filter; alignas(64) std::atomic<size_t> m_tombstonecnt; - //alignas(64) std::atomic<uint32_t> m_current_tail; alignas(64) std::atomic<uint32_t> m_reccnt; alignas(64) std::atomic<double> m_weight; alignas(64) std::atomic<double> m_max_weight; |