diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-15 16:48:56 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-15 16:48:56 -0400 |
| commit | ff000799c3254f52e0beabbe9c62d10c3fc4178e (patch) | |
| tree | 49a1a045678315e8e215fd80409973679b793043 /include/framework | |
| parent | 418e9b079e559c86f3a5b276f712ad2f5d66533c (diff) | |
| download | dynamic-extension-ff000799c3254f52e0beabbe9c62d10c3fc4178e.tar.gz | |
Record format generalization
Currently, tombstone counting is bugged. But the rest of it appears to
be working.
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 79 | ||||
| -rw-r--r-- | include/framework/InternalLevel.h | 39 | ||||
| -rw-r--r-- | include/framework/MutableBuffer.h | 109 |
3 files changed, 101 insertions, 126 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index c65290a..1d9ee76 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -68,11 +68,12 @@ enum class DeletePolicy { typedef ssize_t level_index; -template <typename K, typename V, typename W=void> +template <RecordInterface R> class DynamicExtension { - typedef WIRS<K,V,W> Shard; - typedef MutableBuffer<K,V,W> MBuffer; - typedef Record<K, V, W> Rec; + typedef WIRS<R> Shard; + typedef decltype(R::key) K; + typedef decltype(R::value) V; + typedef decltype(R::weight) W; public: DynamicExtension(size_t buffer_cap, size_t buffer_delete_cap, size_t scale_factor, @@ -82,8 +83,8 @@ public: m_max_delete_prop(max_delete_prop), m_max_rejection_rate(max_rejection_prop), m_last_level_idx(-1), - m_buffer_1(new MutableBuffer<K,V,W>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), - m_buffer_2(new MutableBuffer<K,V,W>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), + m_buffer_1(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), + m_buffer_2(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), m_buffer_1_merging(false), m_buffer_2_merging(false) {} ~DynamicExtension() { @@ -113,9 +114,9 @@ public: return buffer->delete_record(key, val); } - int append(const K& key, const V& val, W weight, bool tombstone, gsl_rng *rng) { + int append(R &rec, gsl_rng *rng) { // NOTE: single-threaded implementation only - MutableBuffer<K,V,W> *buffer; + MutableBuffer<R> *buffer; while (!(buffer = get_buffer())) ; @@ -123,13 +124,13 @@ public: merge_buffer(rng); } - return buffer->append(key, val, weight, tombstone); + return buffer->append(rec); } - void range_sample(Record<K,V,W> *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { + void range_sample(R *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { auto buffer = get_buffer(); Alias *buffer_alias = nullptr; - std::vector<Record<K,V,W> *> buffer_records; + std::vector<R *> buffer_records; size_t buffer_cutoff = 0; W buffer_weight; @@ -185,7 +186,7 @@ public: rejections = 0; while (shard_samples[0] > 0) { - const Record<K,V,W> *rec; + const R *rec; if (LSM_REJ_SAMPLE) { rec = buffer->get_sample(lower_key, upper_key, rng); } else { @@ -193,13 +194,13 @@ public: } if (DELETE_TAGGING) { - if (rec && !rec->get_delete_status()) { + if (rec && !rec->is_deleted()) { sample_set[sample_idx++] = *rec; } else { rejections++; } } else { - if (rec && !buffer->check_tombstone(rec->key, rec->value)) { + if (rec && !buffer->check_tombstone(*rec)) { sample_set[sample_idx++] = *rec; } else { rejections++; @@ -220,14 +221,14 @@ public: } } - std::vector<Rec> results; + std::vector<R> results; for (size_t i=1; i<shard_samples.size(); i++) { results.reserve(shard_samples[i]); shards[i].second->get_samples(states[i], results, lower_key, upper_key, shard_samples[i], rng); for (size_t j=0; j<results.size(); j++) { - if (rejection(&results[j], shards[i].first, lower_key, upper_key, buffer, buffer_cutoff)) { + if (rejection(results[j], shards[i].first, lower_key, upper_key, buffer, buffer_cutoff)) { rejections++; continue; } @@ -252,16 +253,16 @@ public: // should correspond to the shard containing the record in question // // Passing INVALID_SHID indicates that the record exists within the buffer - bool is_deleted(const Record<K,V,W> *record, const ShardID &shid, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) { + bool is_deleted(const R &record, const ShardID &shid, MutableBuffer<R> *buffer, size_t buffer_cutoff) { // If tagging is enabled, we just need to check if the record has the delete tag set if (DELETE_TAGGING) { - return record->get_delete_status(); + return record.is_deleted(); } // Otherwise, we need to look for a tombstone. // check for tombstone in the buffer. This will require accounting for the cutoff eventually. - if (buffer->check_tombstone(record->key, record->value)) { + if (buffer->check_tombstone(record)) { return true; } @@ -271,13 +272,13 @@ public: } for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { - if (m_levels[lvl]->check_tombstone(0, record->key, record->value)) { + if (m_levels[lvl]->check_tombstone(0, record)) { return true; } } // check the level containing the shard - return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record->key, record->value); + return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record); } @@ -379,8 +380,8 @@ public: } private: - MutableBuffer<K,V,W> *m_buffer_1; - MutableBuffer<K,V,W> *m_buffer_2; + MutableBuffer<R> *m_buffer_1; + MutableBuffer<R> *m_buffer_2; std::atomic<bool> m_active_buffer; std::atomic<bool> m_buffer_1_merging; std::atomic<bool> m_buffer_2_merging; @@ -389,11 +390,11 @@ private: double m_max_delete_prop; double m_max_rejection_rate; - std::vector<InternalLevel<K,V,W> *> m_levels; + std::vector<InternalLevel<R> *> m_levels; level_index m_last_level_idx; - MutableBuffer<K,V,W> *get_buffer() { + MutableBuffer<R> *get_buffer() { if (m_buffer_1_merging && m_buffer_2_merging) { return nullptr; } @@ -401,11 +402,11 @@ private: return (m_active_buffer) ? m_buffer_2 : m_buffer_1; } - inline bool rejection(const Record<K,V,W> *record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) { - if (record->is_tombstone()) { + inline bool rejection(const R &record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer<R> *buffer, size_t buffer_cutoff) { + if (record.is_tombstone()) { tombstone_rejections++; return true; - } else if (record->key < lower_bound || record->key > upper_bound) { + } else if (record.key < lower_bound || record.key > upper_bound) { bounds_rejections++; return true; } else if (is_deleted(record, shid, buffer, buffer_cutoff)) { @@ -416,8 +417,8 @@ private: return false; } - inline bool add_to_sample(const Record<K,V,W> *record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer, - Record<K,V,W> *sample_buffer, size_t &sample_idx, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) { + inline bool add_to_sample(const R &record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer, + R *sample_buffer, size_t &sample_idx, MutableBuffer<R> *buffer, size_t buffer_cutoff) { TIMER_INIT(); TIMER_START(); sampling_attempts++; @@ -445,7 +446,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel<K,V,W>(new_idx, new_shard_cnt, DELETE_TAGGING)); + m_levels.emplace_back(new InternalLevel<R>(new_idx, new_shard_cnt, DELETE_TAGGING)); m_last_level_idx++; return new_idx; @@ -495,7 +496,7 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first merge. */ - inline level_index find_mergable_level(level_index idx, MutableBuffer<K,V,W> *buffer=nullptr) { + inline level_index find_mergable_level(level_index idx, MutableBuffer<R> *buffer=nullptr) { if (idx == 0 && m_levels.size() == 0) return -1; @@ -525,7 +526,7 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<K,V,W>::merge_levels(m_levels[base_level], m_levels[incoming_level], + m_levels[base_level] = InternalLevel<R>::merge_levels(m_levels[base_level], m_levels[incoming_level], DELETE_TAGGING, rng); mark_as_unused(tmp); } else { @@ -533,18 +534,18 @@ private: } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel<K,V,W>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING); + m_levels[incoming_level] = new InternalLevel<R>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING); } - inline void merge_buffer_into_l0(MutableBuffer<K,V,W> *buffer, gsl_rng *rng) { + inline void merge_buffer_into_l0(MutableBuffer<R> *buffer, gsl_rng *rng) { assert(m_levels[0]); if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel<K,V,W>(0, 1, DELETE_TAGGING); + auto temp_level = new InternalLevel<R>(0, 1, DELETE_TAGGING); temp_level->append_mem_table(buffer, rng); - auto new_level = InternalLevel<K,V,W>::merge_levels(old_level, temp_level, DELETE_TAGGING, rng); + auto new_level = InternalLevel<R>::merge_levels(old_level, temp_level, DELETE_TAGGING, rng); m_levels[0] = new_level; delete temp_level; @@ -560,7 +561,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel<K,V,W> *level) { + inline void mark_as_unused(InternalLevel<R> *level) { delete level; } @@ -608,7 +609,7 @@ private: * a pointer to the memory table to use, if desired. Otherwise, there are * no guarantees about which buffer will be accessed if level_index is -1. */ - inline size_t get_level_record_count(level_index idx, MutableBuffer<K,V,W> *buffer=nullptr) { + inline size_t get_level_record_count(level_index idx, MutableBuffer<R> *buffer=nullptr) { assert(idx >= -1); if (idx == -1) { return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count(); diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index 727a382..f0f19da 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -19,16 +19,19 @@ namespace de { -template <typename K, typename V, typename W=void> +template <RecordInterface R> class InternalLevel { static const size_t REJECTION_TRIGGER_THRESHOLD = 1024; + typedef decltype(R::key) K; + typedef decltype(R::value) V; + private: struct InternalLevelStructure { InternalLevelStructure(size_t cap) : m_cap(cap) - , m_shards(new WIRS<K, V, W>*[cap]{nullptr}) + , m_shards(new WIRS<R>*[cap]{nullptr}) , m_bfs(new BloomFilter*[cap]{nullptr}) {} ~InternalLevelStructure() { @@ -42,7 +45,7 @@ private: } size_t m_cap; - WIRS<K, V, W>** m_shards; + WIRS<R>** m_shards; BloomFilter** m_bfs; }; @@ -74,40 +77,40 @@ public: new BloomFilter(BF_FPR, new_level->get_tombstone_count() + base_level->get_tombstone_count(), BF_HASH_FUNCS, rng); - WIRS<K, V, W>* shards[2]; + WIRS<R>* shards[2]; shards[0] = base_level->m_structure->m_shards[0]; shards[1] = new_level->m_structure->m_shards[0]; - res->m_structure->m_shards[0] = new WIRS<K, V, W>(shards, 2, res->m_structure->m_bfs[0], tagging); + res->m_structure->m_shards[0] = new WIRS<R>(shards, 2, res->m_structure->m_bfs[0], tagging); return res; } - void append_mem_table(MutableBuffer<K,V,W>* buffer, const gsl_rng* rng) { + void append_mem_table(MutableBuffer<R>* buffer, const gsl_rng* rng) { assert(m_shard_cnt < m_structure->m_cap); m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng); - m_structure->m_shards[m_shard_cnt] = new WIRS<K, V, W>(buffer, m_structure->m_bfs[m_shard_cnt], m_tagging); + m_structure->m_shards[m_shard_cnt] = new WIRS<R>(buffer, m_structure->m_bfs[m_shard_cnt], m_tagging); ++m_shard_cnt; } void append_merged_shards(InternalLevel* level, const gsl_rng* rng) { assert(m_shard_cnt < m_structure->m_cap); m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng); - m_structure->m_shards[m_shard_cnt] = new WIRS<K, V, W>(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt], m_tagging); + m_structure->m_shards[m_shard_cnt] = new WIRS<R>(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt], m_tagging); ++m_shard_cnt; } - WIRS<K, V, W> *get_merged_shard() { - WIRS<K, V, W> *shards[m_shard_cnt]; + WIRS<R> *get_merged_shard() { + WIRS<R> *shards[m_shard_cnt]; for (size_t i=0; i<m_shard_cnt; i++) { shards[i] = (m_structure->m_shards[i]) ? m_structure->m_shards[i] : nullptr; } - return new WIRS<K, V, W>(shards, m_shard_cnt, nullptr, m_tagging); + return new WIRS<R>(shards, m_shard_cnt, nullptr, m_tagging); } // Append the sample range in-order..... - void get_shard_weights(std::vector<W>& weights, std::vector<std::pair<ShardID, WIRS<K, V, W> *>> &shards, std::vector<void*>& shard_states, const K& low, const K& high) { + void get_shard_weights(std::vector<uint64_t>& weights, std::vector<std::pair<ShardID, WIRS<R> *>> &shards, std::vector<void*>& shard_states, const K& low, const K& high) { for (size_t i=0; i<m_shard_cnt; i++) { if (m_structure->m_shards[i]) { auto shard_state = m_structure->m_shards[i]->get_sample_shard_state(low, high); @@ -116,7 +119,7 @@ public: weights.push_back(shard_state->tot_weight); shard_states.emplace_back(shard_state); } else { - WIRS<K, V, W>::delete_state(shard_state); + WIRS<R>::delete_state(shard_state); } } } @@ -130,12 +133,12 @@ public: return false; } - bool check_tombstone(size_t shard_stop, const K& key, const V& val) { + bool check_tombstone(size_t shard_stop, const R& rec) { if (m_shard_cnt == 0) return false; for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) { - if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(key)) - && m_structure->m_shards[i]->check_tombstone(key, val)) + if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(rec.key)) + && m_structure->m_shards[i]->check_tombstone(rec)) return true; } return false; @@ -151,11 +154,11 @@ public: return false; } - const Record<K, V, W>* get_record_at(size_t shard_no, size_t idx) { + const R* get_record_at(size_t shard_no, size_t idx) { return m_structure->m_shards[shard_no]->get_record_at(idx); } - WIRS<K, V, W>* get_shard(size_t idx) { + WIRS<R>* get_shard(size_t idx) { return m_structure->m_shards[idx]; } diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index 42bc9a7..74838b8 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -26,15 +26,15 @@ namespace de { -template <typename K, typename V, typename W=void> +template <RecordInterface R> class MutableBuffer { public: MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap, const gsl_rng* rng) : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { - auto len = capacity * sizeof(Record<K, V, W>); + auto len = capacity * sizeof(R); size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE)); - m_data = (Record<K, V, W>*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); + m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); m_tombstone_filter = nullptr; if (max_tombstone_cap > 0) { assert(rng != nullptr); @@ -47,68 +47,35 @@ public: if (m_tombstone_filter) delete m_tombstone_filter; } - template <typename W_=W, - typename =std::enable_if_t<std::is_same<W_, void>::value>> - int append(const K& key, const V& value, bool is_tombstone = false) { - static_assert(std::is_same<W_, void>::value); - if (is_tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; + template <typename R_ = R> + int append(const R &rec) { + if (rec.is_tombstone() && m_tombstonecnt + 1 > m_tombstone_cap) return 0; int32_t pos = 0; if ((pos = try_advance_tail()) == -1) return 0; - m_data[pos].key = key; - m_data[pos].value = value; - m_data[pos].header = ((pos << 2) | (is_tombstone ? 1 : 0)); + m_data[pos] = rec; + m_data[pos].header |= (pos << 2); - if (is_tombstone) { + if (rec.is_tombstone()) { m_tombstonecnt.fetch_add(1); - if (m_tombstone_filter) m_tombstone_filter->insert(key); + if (m_tombstone_filter) m_tombstone_filter->insert(rec.key); } - m_weight.fetch_add(1); - return 1; - } - - template <typename W_=W, - typename = std::enable_if_t<!std::is_same<W_, void>::value>> - int append(const K& key, const V& value, W_ weight=1, bool is_tombstone = false) { - static_assert(!std::is_same<W_, void>::value); - if (is_tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; - - int32_t pos = 0; - if ((pos = try_advance_tail()) == -1) return 0; - - if (is_tombstone) { - weight = 0; - } - - m_data[pos].key = key; - m_data[pos].value = value; - m_data[pos].header = ((pos << 2) | (is_tombstone ? 1 : 0)); - m_data[pos].weight = weight; - - if (is_tombstone) { - m_tombstonecnt.fetch_add(1); - if (m_tombstone_filter) m_tombstone_filter->insert(key); - } - - double old_val, new_val; - do { - old_val = m_weight.load(); - new_val = old_val + weight; - } while (!m_weight.compare_exchange_strong(old_val, new_val)); - - - double old = m_max_weight.load(); - while (old < weight) { - m_max_weight.compare_exchange_strong(old, weight); - old = m_max_weight.load(); + if constexpr (WeightedRecordInterface<R_>) { + m_weight.fetch_add(rec.weight); + double old = m_max_weight.load(); + while (old < rec.weight) { + m_max_weight.compare_exchange_strong(old, rec.weight); + old = m_max_weight.load(); + } + } else { + m_weight.fetch_add(1); } return 1; } - bool truncate() { m_tombstonecnt.store(0); m_reccnt.store(0); @@ -119,10 +86,10 @@ public: return true; } - Record<K, V, W>* sorted_output() { + R* sorted_output() { TIMER_INIT(); TIMER_START(); - std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp<K,V,W>); + std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp<R>); TIMER_STOP(); #ifdef INSTRUMENT_MERGING @@ -147,11 +114,11 @@ public: return m_tombstonecnt.load(); } - bool delete_record(const K& key, const V& val) { + bool delete_record(const R& rec) { auto offset = 0; while (offset < m_reccnt.load()) { - if (m_data[offset].match(key, val, false)) { - m_data[offset].set_delete_status(); + if (m_data[offset] == rec) { + m_data[offset].set_delete(); return true; } offset++; @@ -160,23 +127,25 @@ public: return false; } - bool check_tombstone(const K& key, const V& value) { - if (m_tombstone_filter && !m_tombstone_filter->lookup(key)) return false; + bool check_tombstone(const R& rec) { + if (m_tombstone_filter && !m_tombstone_filter->lookup(rec.key)) return false; auto offset = 0; while (offset < m_reccnt.load()) { - if (m_data[offset].match(key, value, true)) return true; + if (m_data[offset] == rec && m_data[offset].is_tombstone()) { + return true; + } offset++;; } return false; } - const Record<K, V, W>* get_record_at(size_t idx) { + const R* get_record_at(size_t idx) { return m_data + idx; } size_t get_memory_utilization() { - return m_cap * sizeof(Record<K, V, W>); + return m_cap * sizeof(R); } size_t get_aux_memory_utilization() { @@ -185,17 +154,18 @@ public: // // NOTE: This operation samples from records strictly between the upper and // lower bounds, not including them - double get_sample_range(const K& lower, const K& upper, std::vector<Record<K, V, W> *> &records, - Alias **alias, size_t *cutoff) { + template <typename R_ = R> + double get_sample_range(const decltype(R_::key) lower, const decltype(R_::key)& upper, + std::vector<R *> &records, Alias **alias, size_t *cutoff) { std::vector<double> weights; *cutoff = std::atomic_load(&m_reccnt) - 1; records.clear(); double tot_weight = 0.0; for (size_t i = 0; i < (*cutoff) + 1; i++) { - Record<K, V, W> *rec = m_data + i; + R *rec = m_data + i; - if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->get_delete_status()) { + if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->is_deleted()) { weights.push_back(rec->weight); records.push_back(rec); tot_weight += rec->weight; @@ -212,7 +182,8 @@ public: } // rejection sampling - const Record<K, V, W> *get_sample(const K& lower, const K& upper, gsl_rng *rng) { + template <typename R_ = R> + const R *get_sample(const decltype(R_::key)& lower, const decltype(R_::key)& upper, gsl_rng *rng) { size_t reccnt = m_reccnt.load(); if (reccnt == 0) { return nullptr; @@ -230,7 +201,7 @@ public: if (test <= rec->weight && rec->key >= lower && rec->key <= upper && - !rec->is_tombstone() && !rec->get_delete_status()) { + !rec->is_tombstone() && !rec->is_deleted()) { return rec; } @@ -259,7 +230,7 @@ private: size_t m_tombstone_cap; //char* m_data; - Record<K, V, W>* m_data; + R* m_data; BloomFilter* m_tombstone_filter; alignas(64) std::atomic<size_t> m_tombstonecnt; |