From ff000799c3254f52e0beabbe9c62d10c3fc4178e Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 15 May 2023 16:48:56 -0400 Subject: Record format generalization Currently, tombstone counting is bugged. But the rest of it appears to be working. --- include/framework/DynamicExtension.h | 79 ++++++++++++------------- include/framework/InternalLevel.h | 39 +++++++------ include/framework/MutableBuffer.h | 109 +++++++++++++---------------------- 3 files changed, 101 insertions(+), 126 deletions(-) (limited to 'include/framework') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index c65290a..1d9ee76 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -68,11 +68,12 @@ enum class DeletePolicy { typedef ssize_t level_index; -template +template class DynamicExtension { - typedef WIRS Shard; - typedef MutableBuffer MBuffer; - typedef Record Rec; + typedef WIRS Shard; + typedef decltype(R::key) K; + typedef decltype(R::value) V; + typedef decltype(R::weight) W; public: DynamicExtension(size_t buffer_cap, size_t buffer_delete_cap, size_t scale_factor, @@ -82,8 +83,8 @@ public: m_max_delete_prop(max_delete_prop), m_max_rejection_rate(max_rejection_prop), m_last_level_idx(-1), - m_buffer_1(new MutableBuffer(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), - m_buffer_2(new MutableBuffer(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), + m_buffer_1(new MutableBuffer(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), + m_buffer_2(new MutableBuffer(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), m_buffer_1_merging(false), m_buffer_2_merging(false) {} ~DynamicExtension() { @@ -113,9 +114,9 @@ public: return buffer->delete_record(key, val); } - int append(const K& key, const V& val, W weight, bool tombstone, gsl_rng *rng) { + int append(R &rec, gsl_rng *rng) { // NOTE: single-threaded implementation only - MutableBuffer *buffer; + MutableBuffer *buffer; while (!(buffer = get_buffer())) ; @@ -123,13 +124,13 @@ public: merge_buffer(rng); } - return buffer->append(key, val, weight, tombstone); + return buffer->append(rec); } - void range_sample(Record *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { + void range_sample(R *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { auto buffer = get_buffer(); Alias *buffer_alias = nullptr; - std::vector *> buffer_records; + std::vector buffer_records; size_t buffer_cutoff = 0; W buffer_weight; @@ -185,7 +186,7 @@ public: rejections = 0; while (shard_samples[0] > 0) { - const Record *rec; + const R *rec; if (LSM_REJ_SAMPLE) { rec = buffer->get_sample(lower_key, upper_key, rng); } else { @@ -193,13 +194,13 @@ public: } if (DELETE_TAGGING) { - if (rec && !rec->get_delete_status()) { + if (rec && !rec->is_deleted()) { sample_set[sample_idx++] = *rec; } else { rejections++; } } else { - if (rec && !buffer->check_tombstone(rec->key, rec->value)) { + if (rec && !buffer->check_tombstone(*rec)) { sample_set[sample_idx++] = *rec; } else { rejections++; @@ -220,14 +221,14 @@ public: } } - std::vector results; + std::vector results; for (size_t i=1; iget_samples(states[i], results, lower_key, upper_key, shard_samples[i], rng); for (size_t j=0; j *record, const ShardID &shid, MutableBuffer *buffer, size_t buffer_cutoff) { + bool is_deleted(const R &record, const ShardID &shid, MutableBuffer *buffer, size_t buffer_cutoff) { // If tagging is enabled, we just need to check if the record has the delete tag set if (DELETE_TAGGING) { - return record->get_delete_status(); + return record.is_deleted(); } // Otherwise, we need to look for a tombstone. // check for tombstone in the buffer. This will require accounting for the cutoff eventually. - if (buffer->check_tombstone(record->key, record->value)) { + if (buffer->check_tombstone(record)) { return true; } @@ -271,13 +272,13 @@ public: } for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { - if (m_levels[lvl]->check_tombstone(0, record->key, record->value)) { + if (m_levels[lvl]->check_tombstone(0, record)) { return true; } } // check the level containing the shard - return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record->key, record->value); + return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record); } @@ -379,8 +380,8 @@ public: } private: - MutableBuffer *m_buffer_1; - MutableBuffer *m_buffer_2; + MutableBuffer *m_buffer_1; + MutableBuffer *m_buffer_2; std::atomic m_active_buffer; std::atomic m_buffer_1_merging; std::atomic m_buffer_2_merging; @@ -389,11 +390,11 @@ private: double m_max_delete_prop; double m_max_rejection_rate; - std::vector *> m_levels; + std::vector *> m_levels; level_index m_last_level_idx; - MutableBuffer *get_buffer() { + MutableBuffer *get_buffer() { if (m_buffer_1_merging && m_buffer_2_merging) { return nullptr; } @@ -401,11 +402,11 @@ private: return (m_active_buffer) ? m_buffer_2 : m_buffer_1; } - inline bool rejection(const Record *record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer *buffer, size_t buffer_cutoff) { - if (record->is_tombstone()) { + inline bool rejection(const R &record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer *buffer, size_t buffer_cutoff) { + if (record.is_tombstone()) { tombstone_rejections++; return true; - } else if (record->key < lower_bound || record->key > upper_bound) { + } else if (record.key < lower_bound || record.key > upper_bound) { bounds_rejections++; return true; } else if (is_deleted(record, shid, buffer, buffer_cutoff)) { @@ -416,8 +417,8 @@ private: return false; } - inline bool add_to_sample(const Record *record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer, - Record *sample_buffer, size_t &sample_idx, MutableBuffer *buffer, size_t buffer_cutoff) { + inline bool add_to_sample(const R &record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer, + R *sample_buffer, size_t &sample_idx, MutableBuffer *buffer, size_t buffer_cutoff) { TIMER_INIT(); TIMER_START(); sampling_attempts++; @@ -445,7 +446,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel(new_idx, new_shard_cnt, DELETE_TAGGING)); + m_levels.emplace_back(new InternalLevel(new_idx, new_shard_cnt, DELETE_TAGGING)); m_last_level_idx++; return new_idx; @@ -495,7 +496,7 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first merge. */ - inline level_index find_mergable_level(level_index idx, MutableBuffer *buffer=nullptr) { + inline level_index find_mergable_level(level_index idx, MutableBuffer *buffer=nullptr) { if (idx == 0 && m_levels.size() == 0) return -1; @@ -525,7 +526,7 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel::merge_levels(m_levels[base_level], m_levels[incoming_level], + m_levels[base_level] = InternalLevel::merge_levels(m_levels[base_level], m_levels[incoming_level], DELETE_TAGGING, rng); mark_as_unused(tmp); } else { @@ -533,18 +534,18 @@ private: } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING); + m_levels[incoming_level] = new InternalLevel(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING); } - inline void merge_buffer_into_l0(MutableBuffer *buffer, gsl_rng *rng) { + inline void merge_buffer_into_l0(MutableBuffer *buffer, gsl_rng *rng) { assert(m_levels[0]); if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel(0, 1, DELETE_TAGGING); + auto temp_level = new InternalLevel(0, 1, DELETE_TAGGING); temp_level->append_mem_table(buffer, rng); - auto new_level = InternalLevel::merge_levels(old_level, temp_level, DELETE_TAGGING, rng); + auto new_level = InternalLevel::merge_levels(old_level, temp_level, DELETE_TAGGING, rng); m_levels[0] = new_level; delete temp_level; @@ -560,7 +561,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel *level) { + inline void mark_as_unused(InternalLevel *level) { delete level; } @@ -608,7 +609,7 @@ private: * a pointer to the memory table to use, if desired. Otherwise, there are * no guarantees about which buffer will be accessed if level_index is -1. */ - inline size_t get_level_record_count(level_index idx, MutableBuffer *buffer=nullptr) { + inline size_t get_level_record_count(level_index idx, MutableBuffer *buffer=nullptr) { assert(idx >= -1); if (idx == -1) { return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count(); diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index 727a382..f0f19da 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -19,16 +19,19 @@ namespace de { -template +template class InternalLevel { static const size_t REJECTION_TRIGGER_THRESHOLD = 1024; + typedef decltype(R::key) K; + typedef decltype(R::value) V; + private: struct InternalLevelStructure { InternalLevelStructure(size_t cap) : m_cap(cap) - , m_shards(new WIRS*[cap]{nullptr}) + , m_shards(new WIRS*[cap]{nullptr}) , m_bfs(new BloomFilter*[cap]{nullptr}) {} ~InternalLevelStructure() { @@ -42,7 +45,7 @@ private: } size_t m_cap; - WIRS** m_shards; + WIRS** m_shards; BloomFilter** m_bfs; }; @@ -74,40 +77,40 @@ public: new BloomFilter(BF_FPR, new_level->get_tombstone_count() + base_level->get_tombstone_count(), BF_HASH_FUNCS, rng); - WIRS* shards[2]; + WIRS* shards[2]; shards[0] = base_level->m_structure->m_shards[0]; shards[1] = new_level->m_structure->m_shards[0]; - res->m_structure->m_shards[0] = new WIRS(shards, 2, res->m_structure->m_bfs[0], tagging); + res->m_structure->m_shards[0] = new WIRS(shards, 2, res->m_structure->m_bfs[0], tagging); return res; } - void append_mem_table(MutableBuffer* buffer, const gsl_rng* rng) { + void append_mem_table(MutableBuffer* buffer, const gsl_rng* rng) { assert(m_shard_cnt < m_structure->m_cap); m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng); - m_structure->m_shards[m_shard_cnt] = new WIRS(buffer, m_structure->m_bfs[m_shard_cnt], m_tagging); + m_structure->m_shards[m_shard_cnt] = new WIRS(buffer, m_structure->m_bfs[m_shard_cnt], m_tagging); ++m_shard_cnt; } void append_merged_shards(InternalLevel* level, const gsl_rng* rng) { assert(m_shard_cnt < m_structure->m_cap); m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng); - m_structure->m_shards[m_shard_cnt] = new WIRS(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt], m_tagging); + m_structure->m_shards[m_shard_cnt] = new WIRS(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt], m_tagging); ++m_shard_cnt; } - WIRS *get_merged_shard() { - WIRS *shards[m_shard_cnt]; + WIRS *get_merged_shard() { + WIRS *shards[m_shard_cnt]; for (size_t i=0; im_shards[i]) ? m_structure->m_shards[i] : nullptr; } - return new WIRS(shards, m_shard_cnt, nullptr, m_tagging); + return new WIRS(shards, m_shard_cnt, nullptr, m_tagging); } // Append the sample range in-order..... - void get_shard_weights(std::vector& weights, std::vector *>> &shards, std::vector& shard_states, const K& low, const K& high) { + void get_shard_weights(std::vector& weights, std::vector *>> &shards, std::vector& shard_states, const K& low, const K& high) { for (size_t i=0; im_shards[i]) { auto shard_state = m_structure->m_shards[i]->get_sample_shard_state(low, high); @@ -116,7 +119,7 @@ public: weights.push_back(shard_state->tot_weight); shard_states.emplace_back(shard_state); } else { - WIRS::delete_state(shard_state); + WIRS::delete_state(shard_state); } } } @@ -130,12 +133,12 @@ public: return false; } - bool check_tombstone(size_t shard_stop, const K& key, const V& val) { + bool check_tombstone(size_t shard_stop, const R& rec) { if (m_shard_cnt == 0) return false; for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) { - if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(key)) - && m_structure->m_shards[i]->check_tombstone(key, val)) + if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(rec.key)) + && m_structure->m_shards[i]->check_tombstone(rec)) return true; } return false; @@ -151,11 +154,11 @@ public: return false; } - const Record* get_record_at(size_t shard_no, size_t idx) { + const R* get_record_at(size_t shard_no, size_t idx) { return m_structure->m_shards[shard_no]->get_record_at(idx); } - WIRS* get_shard(size_t idx) { + WIRS* get_shard(size_t idx) { return m_structure->m_shards[idx]; } diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index 42bc9a7..74838b8 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -26,15 +26,15 @@ namespace de { -template +template class MutableBuffer { public: MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap, const gsl_rng* rng) : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { - auto len = capacity * sizeof(Record); + auto len = capacity * sizeof(R); size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE)); - m_data = (Record*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); + m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); m_tombstone_filter = nullptr; if (max_tombstone_cap > 0) { assert(rng != nullptr); @@ -47,68 +47,35 @@ public: if (m_tombstone_filter) delete m_tombstone_filter; } - template ::value>> - int append(const K& key, const V& value, bool is_tombstone = false) { - static_assert(std::is_same::value); - if (is_tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; + template + int append(const R &rec) { + if (rec.is_tombstone() && m_tombstonecnt + 1 > m_tombstone_cap) return 0; int32_t pos = 0; if ((pos = try_advance_tail()) == -1) return 0; - m_data[pos].key = key; - m_data[pos].value = value; - m_data[pos].header = ((pos << 2) | (is_tombstone ? 1 : 0)); + m_data[pos] = rec; + m_data[pos].header |= (pos << 2); - if (is_tombstone) { + if (rec.is_tombstone()) { m_tombstonecnt.fetch_add(1); - if (m_tombstone_filter) m_tombstone_filter->insert(key); + if (m_tombstone_filter) m_tombstone_filter->insert(rec.key); } - m_weight.fetch_add(1); - return 1; - } - - template ::value>> - int append(const K& key, const V& value, W_ weight=1, bool is_tombstone = false) { - static_assert(!std::is_same::value); - if (is_tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; - - int32_t pos = 0; - if ((pos = try_advance_tail()) == -1) return 0; - - if (is_tombstone) { - weight = 0; - } - - m_data[pos].key = key; - m_data[pos].value = value; - m_data[pos].header = ((pos << 2) | (is_tombstone ? 1 : 0)); - m_data[pos].weight = weight; - - if (is_tombstone) { - m_tombstonecnt.fetch_add(1); - if (m_tombstone_filter) m_tombstone_filter->insert(key); - } - - double old_val, new_val; - do { - old_val = m_weight.load(); - new_val = old_val + weight; - } while (!m_weight.compare_exchange_strong(old_val, new_val)); - - - double old = m_max_weight.load(); - while (old < weight) { - m_max_weight.compare_exchange_strong(old, weight); - old = m_max_weight.load(); + if constexpr (WeightedRecordInterface) { + m_weight.fetch_add(rec.weight); + double old = m_max_weight.load(); + while (old < rec.weight) { + m_max_weight.compare_exchange_strong(old, rec.weight); + old = m_max_weight.load(); + } + } else { + m_weight.fetch_add(1); } return 1; } - bool truncate() { m_tombstonecnt.store(0); m_reccnt.store(0); @@ -119,10 +86,10 @@ public: return true; } - Record* sorted_output() { + R* sorted_output() { TIMER_INIT(); TIMER_START(); - std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp); + std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp); TIMER_STOP(); #ifdef INSTRUMENT_MERGING @@ -147,11 +114,11 @@ public: return m_tombstonecnt.load(); } - bool delete_record(const K& key, const V& val) { + bool delete_record(const R& rec) { auto offset = 0; while (offset < m_reccnt.load()) { - if (m_data[offset].match(key, val, false)) { - m_data[offset].set_delete_status(); + if (m_data[offset] == rec) { + m_data[offset].set_delete(); return true; } offset++; @@ -160,23 +127,25 @@ public: return false; } - bool check_tombstone(const K& key, const V& value) { - if (m_tombstone_filter && !m_tombstone_filter->lookup(key)) return false; + bool check_tombstone(const R& rec) { + if (m_tombstone_filter && !m_tombstone_filter->lookup(rec.key)) return false; auto offset = 0; while (offset < m_reccnt.load()) { - if (m_data[offset].match(key, value, true)) return true; + if (m_data[offset] == rec && m_data[offset].is_tombstone()) { + return true; + } offset++;; } return false; } - const Record* get_record_at(size_t idx) { + const R* get_record_at(size_t idx) { return m_data + idx; } size_t get_memory_utilization() { - return m_cap * sizeof(Record); + return m_cap * sizeof(R); } size_t get_aux_memory_utilization() { @@ -185,17 +154,18 @@ public: // // NOTE: This operation samples from records strictly between the upper and // lower bounds, not including them - double get_sample_range(const K& lower, const K& upper, std::vector *> &records, - Alias **alias, size_t *cutoff) { + template + double get_sample_range(const decltype(R_::key) lower, const decltype(R_::key)& upper, + std::vector &records, Alias **alias, size_t *cutoff) { std::vector weights; *cutoff = std::atomic_load(&m_reccnt) - 1; records.clear(); double tot_weight = 0.0; for (size_t i = 0; i < (*cutoff) + 1; i++) { - Record *rec = m_data + i; + R *rec = m_data + i; - if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->get_delete_status()) { + if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->is_deleted()) { weights.push_back(rec->weight); records.push_back(rec); tot_weight += rec->weight; @@ -212,7 +182,8 @@ public: } // rejection sampling - const Record *get_sample(const K& lower, const K& upper, gsl_rng *rng) { + template + const R *get_sample(const decltype(R_::key)& lower, const decltype(R_::key)& upper, gsl_rng *rng) { size_t reccnt = m_reccnt.load(); if (reccnt == 0) { return nullptr; @@ -230,7 +201,7 @@ public: if (test <= rec->weight && rec->key >= lower && rec->key <= upper && - !rec->is_tombstone() && !rec->get_delete_status()) { + !rec->is_tombstone() && !rec->is_deleted()) { return rec; } @@ -259,7 +230,7 @@ private: size_t m_tombstone_cap; //char* m_data; - Record* m_data; + R* m_data; BloomFilter* m_tombstone_filter; alignas(64) std::atomic m_tombstonecnt; -- cgit v1.2.3