From ff000799c3254f52e0beabbe9c62d10c3fc4178e Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 15 May 2023 16:48:56 -0400 Subject: Record format generalization Currently, tombstone counting is bugged. But the rest of it appears to be working. --- include/ds/PriorityQueue.h | 14 ++--- include/framework/DynamicExtension.h | 79 ++++++++++++------------- include/framework/InternalLevel.h | 39 +++++++------ include/framework/MutableBuffer.h | 109 +++++++++++++---------------------- include/shard/MemISAM.h | 72 +++++++++++------------ include/shard/WIRS.h | 108 ++++++++++++++++++---------------- include/util/Cursor.h | 26 ++++----- include/util/Record.h | 86 ++++++++++++++++++++++----- 8 files changed, 288 insertions(+), 245 deletions(-) (limited to 'include') diff --git a/include/ds/PriorityQueue.h b/include/ds/PriorityQueue.h index 290d5c8..22582da 100644 --- a/include/ds/PriorityQueue.h +++ b/include/ds/PriorityQueue.h @@ -16,14 +16,14 @@ namespace de { -template +template struct queue_record { - const Record *data; + const R *data; size_t version; }; -template +template class PriorityQueue { public: PriorityQueue(size_t size) : data(size), tail(0) {} @@ -54,7 +54,7 @@ public: } } - void push(const Record* record, size_t version=0) { + void push(const R* record, size_t version=0) { assert(tail != this->data.size()); size_t new_idx = this->tail++; @@ -67,7 +67,7 @@ public: } - queue_record peek(size_t depth=0) { + queue_record peek(size_t depth=0) { ssize_t idx = 0; size_t cur_depth = 0; @@ -81,7 +81,7 @@ public: } private: - std::vector> data; + std::vector> data; size_t tail; /* @@ -124,7 +124,7 @@ private: } inline bool heap_cmp(size_t a, size_t b) { - if (!data[a].data->match(data[b].data)) { + if (data[a].data != data[b].data) { return *(data[a].data) < *(data[b].data); } else if (data[a].version != data[b].version) return data[a].version < data[b].version; diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index c65290a..1d9ee76 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -68,11 +68,12 @@ enum class DeletePolicy { typedef ssize_t level_index; -template +template class DynamicExtension { - typedef WIRS Shard; - typedef MutableBuffer MBuffer; - typedef Record Rec; + typedef WIRS Shard; + typedef decltype(R::key) K; + typedef decltype(R::value) V; + typedef decltype(R::weight) W; public: DynamicExtension(size_t buffer_cap, size_t buffer_delete_cap, size_t scale_factor, @@ -82,8 +83,8 @@ public: m_max_delete_prop(max_delete_prop), m_max_rejection_rate(max_rejection_prop), m_last_level_idx(-1), - m_buffer_1(new MutableBuffer(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), - m_buffer_2(new MutableBuffer(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), + m_buffer_1(new MutableBuffer(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), + m_buffer_2(new MutableBuffer(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), m_buffer_1_merging(false), m_buffer_2_merging(false) {} ~DynamicExtension() { @@ -113,9 +114,9 @@ public: return buffer->delete_record(key, val); } - int append(const K& key, const V& val, W weight, bool tombstone, gsl_rng *rng) { + int append(R &rec, gsl_rng *rng) { // NOTE: single-threaded implementation only - MutableBuffer *buffer; + MutableBuffer *buffer; while (!(buffer = get_buffer())) ; @@ -123,13 +124,13 @@ public: merge_buffer(rng); } - return buffer->append(key, val, weight, tombstone); + return buffer->append(rec); } - void range_sample(Record *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { + void range_sample(R *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { auto buffer = get_buffer(); Alias *buffer_alias = nullptr; - std::vector *> buffer_records; + std::vector buffer_records; size_t buffer_cutoff = 0; W buffer_weight; @@ -185,7 +186,7 @@ public: rejections = 0; while (shard_samples[0] > 0) { - const Record *rec; + const R *rec; if (LSM_REJ_SAMPLE) { rec = buffer->get_sample(lower_key, upper_key, rng); } else { @@ -193,13 +194,13 @@ public: } if (DELETE_TAGGING) { - if (rec && !rec->get_delete_status()) { + if (rec && !rec->is_deleted()) { sample_set[sample_idx++] = *rec; } else { rejections++; } } else { - if (rec && !buffer->check_tombstone(rec->key, rec->value)) { + if (rec && !buffer->check_tombstone(*rec)) { sample_set[sample_idx++] = *rec; } else { rejections++; @@ -220,14 +221,14 @@ public: } } - std::vector results; + std::vector results; for (size_t i=1; iget_samples(states[i], results, lower_key, upper_key, shard_samples[i], rng); for (size_t j=0; j *record, const ShardID &shid, MutableBuffer *buffer, size_t buffer_cutoff) { + bool is_deleted(const R &record, const ShardID &shid, MutableBuffer *buffer, size_t buffer_cutoff) { // If tagging is enabled, we just need to check if the record has the delete tag set if (DELETE_TAGGING) { - return record->get_delete_status(); + return record.is_deleted(); } // Otherwise, we need to look for a tombstone. // check for tombstone in the buffer. This will require accounting for the cutoff eventually. - if (buffer->check_tombstone(record->key, record->value)) { + if (buffer->check_tombstone(record)) { return true; } @@ -271,13 +272,13 @@ public: } for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { - if (m_levels[lvl]->check_tombstone(0, record->key, record->value)) { + if (m_levels[lvl]->check_tombstone(0, record)) { return true; } } // check the level containing the shard - return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record->key, record->value); + return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record); } @@ -379,8 +380,8 @@ public: } private: - MutableBuffer *m_buffer_1; - MutableBuffer *m_buffer_2; + MutableBuffer *m_buffer_1; + MutableBuffer *m_buffer_2; std::atomic m_active_buffer; std::atomic m_buffer_1_merging; std::atomic m_buffer_2_merging; @@ -389,11 +390,11 @@ private: double m_max_delete_prop; double m_max_rejection_rate; - std::vector *> m_levels; + std::vector *> m_levels; level_index m_last_level_idx; - MutableBuffer *get_buffer() { + MutableBuffer *get_buffer() { if (m_buffer_1_merging && m_buffer_2_merging) { return nullptr; } @@ -401,11 +402,11 @@ private: return (m_active_buffer) ? m_buffer_2 : m_buffer_1; } - inline bool rejection(const Record *record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer *buffer, size_t buffer_cutoff) { - if (record->is_tombstone()) { + inline bool rejection(const R &record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer *buffer, size_t buffer_cutoff) { + if (record.is_tombstone()) { tombstone_rejections++; return true; - } else if (record->key < lower_bound || record->key > upper_bound) { + } else if (record.key < lower_bound || record.key > upper_bound) { bounds_rejections++; return true; } else if (is_deleted(record, shid, buffer, buffer_cutoff)) { @@ -416,8 +417,8 @@ private: return false; } - inline bool add_to_sample(const Record *record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer, - Record *sample_buffer, size_t &sample_idx, MutableBuffer *buffer, size_t buffer_cutoff) { + inline bool add_to_sample(const R &record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer, + R *sample_buffer, size_t &sample_idx, MutableBuffer *buffer, size_t buffer_cutoff) { TIMER_INIT(); TIMER_START(); sampling_attempts++; @@ -445,7 +446,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel(new_idx, new_shard_cnt, DELETE_TAGGING)); + m_levels.emplace_back(new InternalLevel(new_idx, new_shard_cnt, DELETE_TAGGING)); m_last_level_idx++; return new_idx; @@ -495,7 +496,7 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first merge. */ - inline level_index find_mergable_level(level_index idx, MutableBuffer *buffer=nullptr) { + inline level_index find_mergable_level(level_index idx, MutableBuffer *buffer=nullptr) { if (idx == 0 && m_levels.size() == 0) return -1; @@ -525,7 +526,7 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel::merge_levels(m_levels[base_level], m_levels[incoming_level], + m_levels[base_level] = InternalLevel::merge_levels(m_levels[base_level], m_levels[incoming_level], DELETE_TAGGING, rng); mark_as_unused(tmp); } else { @@ -533,18 +534,18 @@ private: } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING); + m_levels[incoming_level] = new InternalLevel(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING); } - inline void merge_buffer_into_l0(MutableBuffer *buffer, gsl_rng *rng) { + inline void merge_buffer_into_l0(MutableBuffer *buffer, gsl_rng *rng) { assert(m_levels[0]); if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel(0, 1, DELETE_TAGGING); + auto temp_level = new InternalLevel(0, 1, DELETE_TAGGING); temp_level->append_mem_table(buffer, rng); - auto new_level = InternalLevel::merge_levels(old_level, temp_level, DELETE_TAGGING, rng); + auto new_level = InternalLevel::merge_levels(old_level, temp_level, DELETE_TAGGING, rng); m_levels[0] = new_level; delete temp_level; @@ -560,7 +561,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel *level) { + inline void mark_as_unused(InternalLevel *level) { delete level; } @@ -608,7 +609,7 @@ private: * a pointer to the memory table to use, if desired. Otherwise, there are * no guarantees about which buffer will be accessed if level_index is -1. */ - inline size_t get_level_record_count(level_index idx, MutableBuffer *buffer=nullptr) { + inline size_t get_level_record_count(level_index idx, MutableBuffer *buffer=nullptr) { assert(idx >= -1); if (idx == -1) { return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count(); diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index 727a382..f0f19da 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -19,16 +19,19 @@ namespace de { -template +template class InternalLevel { static const size_t REJECTION_TRIGGER_THRESHOLD = 1024; + typedef decltype(R::key) K; + typedef decltype(R::value) V; + private: struct InternalLevelStructure { InternalLevelStructure(size_t cap) : m_cap(cap) - , m_shards(new WIRS*[cap]{nullptr}) + , m_shards(new WIRS*[cap]{nullptr}) , m_bfs(new BloomFilter*[cap]{nullptr}) {} ~InternalLevelStructure() { @@ -42,7 +45,7 @@ private: } size_t m_cap; - WIRS** m_shards; + WIRS** m_shards; BloomFilter** m_bfs; }; @@ -74,40 +77,40 @@ public: new BloomFilter(BF_FPR, new_level->get_tombstone_count() + base_level->get_tombstone_count(), BF_HASH_FUNCS, rng); - WIRS* shards[2]; + WIRS* shards[2]; shards[0] = base_level->m_structure->m_shards[0]; shards[1] = new_level->m_structure->m_shards[0]; - res->m_structure->m_shards[0] = new WIRS(shards, 2, res->m_structure->m_bfs[0], tagging); + res->m_structure->m_shards[0] = new WIRS(shards, 2, res->m_structure->m_bfs[0], tagging); return res; } - void append_mem_table(MutableBuffer* buffer, const gsl_rng* rng) { + void append_mem_table(MutableBuffer* buffer, const gsl_rng* rng) { assert(m_shard_cnt < m_structure->m_cap); m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng); - m_structure->m_shards[m_shard_cnt] = new WIRS(buffer, m_structure->m_bfs[m_shard_cnt], m_tagging); + m_structure->m_shards[m_shard_cnt] = new WIRS(buffer, m_structure->m_bfs[m_shard_cnt], m_tagging); ++m_shard_cnt; } void append_merged_shards(InternalLevel* level, const gsl_rng* rng) { assert(m_shard_cnt < m_structure->m_cap); m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng); - m_structure->m_shards[m_shard_cnt] = new WIRS(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt], m_tagging); + m_structure->m_shards[m_shard_cnt] = new WIRS(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt], m_tagging); ++m_shard_cnt; } - WIRS *get_merged_shard() { - WIRS *shards[m_shard_cnt]; + WIRS *get_merged_shard() { + WIRS *shards[m_shard_cnt]; for (size_t i=0; im_shards[i]) ? m_structure->m_shards[i] : nullptr; } - return new WIRS(shards, m_shard_cnt, nullptr, m_tagging); + return new WIRS(shards, m_shard_cnt, nullptr, m_tagging); } // Append the sample range in-order..... - void get_shard_weights(std::vector& weights, std::vector *>> &shards, std::vector& shard_states, const K& low, const K& high) { + void get_shard_weights(std::vector& weights, std::vector *>> &shards, std::vector& shard_states, const K& low, const K& high) { for (size_t i=0; im_shards[i]) { auto shard_state = m_structure->m_shards[i]->get_sample_shard_state(low, high); @@ -116,7 +119,7 @@ public: weights.push_back(shard_state->tot_weight); shard_states.emplace_back(shard_state); } else { - WIRS::delete_state(shard_state); + WIRS::delete_state(shard_state); } } } @@ -130,12 +133,12 @@ public: return false; } - bool check_tombstone(size_t shard_stop, const K& key, const V& val) { + bool check_tombstone(size_t shard_stop, const R& rec) { if (m_shard_cnt == 0) return false; for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) { - if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(key)) - && m_structure->m_shards[i]->check_tombstone(key, val)) + if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(rec.key)) + && m_structure->m_shards[i]->check_tombstone(rec)) return true; } return false; @@ -151,11 +154,11 @@ public: return false; } - const Record* get_record_at(size_t shard_no, size_t idx) { + const R* get_record_at(size_t shard_no, size_t idx) { return m_structure->m_shards[shard_no]->get_record_at(idx); } - WIRS* get_shard(size_t idx) { + WIRS* get_shard(size_t idx) { return m_structure->m_shards[idx]; } diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index 42bc9a7..74838b8 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -26,15 +26,15 @@ namespace de { -template +template class MutableBuffer { public: MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap, const gsl_rng* rng) : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { - auto len = capacity * sizeof(Record); + auto len = capacity * sizeof(R); size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE)); - m_data = (Record*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); + m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); m_tombstone_filter = nullptr; if (max_tombstone_cap > 0) { assert(rng != nullptr); @@ -47,68 +47,35 @@ public: if (m_tombstone_filter) delete m_tombstone_filter; } - template ::value>> - int append(const K& key, const V& value, bool is_tombstone = false) { - static_assert(std::is_same::value); - if (is_tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; + template + int append(const R &rec) { + if (rec.is_tombstone() && m_tombstonecnt + 1 > m_tombstone_cap) return 0; int32_t pos = 0; if ((pos = try_advance_tail()) == -1) return 0; - m_data[pos].key = key; - m_data[pos].value = value; - m_data[pos].header = ((pos << 2) | (is_tombstone ? 1 : 0)); + m_data[pos] = rec; + m_data[pos].header |= (pos << 2); - if (is_tombstone) { + if (rec.is_tombstone()) { m_tombstonecnt.fetch_add(1); - if (m_tombstone_filter) m_tombstone_filter->insert(key); + if (m_tombstone_filter) m_tombstone_filter->insert(rec.key); } - m_weight.fetch_add(1); - return 1; - } - - template ::value>> - int append(const K& key, const V& value, W_ weight=1, bool is_tombstone = false) { - static_assert(!std::is_same::value); - if (is_tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; - - int32_t pos = 0; - if ((pos = try_advance_tail()) == -1) return 0; - - if (is_tombstone) { - weight = 0; - } - - m_data[pos].key = key; - m_data[pos].value = value; - m_data[pos].header = ((pos << 2) | (is_tombstone ? 1 : 0)); - m_data[pos].weight = weight; - - if (is_tombstone) { - m_tombstonecnt.fetch_add(1); - if (m_tombstone_filter) m_tombstone_filter->insert(key); - } - - double old_val, new_val; - do { - old_val = m_weight.load(); - new_val = old_val + weight; - } while (!m_weight.compare_exchange_strong(old_val, new_val)); - - - double old = m_max_weight.load(); - while (old < weight) { - m_max_weight.compare_exchange_strong(old, weight); - old = m_max_weight.load(); + if constexpr (WeightedRecordInterface) { + m_weight.fetch_add(rec.weight); + double old = m_max_weight.load(); + while (old < rec.weight) { + m_max_weight.compare_exchange_strong(old, rec.weight); + old = m_max_weight.load(); + } + } else { + m_weight.fetch_add(1); } return 1; } - bool truncate() { m_tombstonecnt.store(0); m_reccnt.store(0); @@ -119,10 +86,10 @@ public: return true; } - Record* sorted_output() { + R* sorted_output() { TIMER_INIT(); TIMER_START(); - std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp); + std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp); TIMER_STOP(); #ifdef INSTRUMENT_MERGING @@ -147,11 +114,11 @@ public: return m_tombstonecnt.load(); } - bool delete_record(const K& key, const V& val) { + bool delete_record(const R& rec) { auto offset = 0; while (offset < m_reccnt.load()) { - if (m_data[offset].match(key, val, false)) { - m_data[offset].set_delete_status(); + if (m_data[offset] == rec) { + m_data[offset].set_delete(); return true; } offset++; @@ -160,23 +127,25 @@ public: return false; } - bool check_tombstone(const K& key, const V& value) { - if (m_tombstone_filter && !m_tombstone_filter->lookup(key)) return false; + bool check_tombstone(const R& rec) { + if (m_tombstone_filter && !m_tombstone_filter->lookup(rec.key)) return false; auto offset = 0; while (offset < m_reccnt.load()) { - if (m_data[offset].match(key, value, true)) return true; + if (m_data[offset] == rec && m_data[offset].is_tombstone()) { + return true; + } offset++;; } return false; } - const Record* get_record_at(size_t idx) { + const R* get_record_at(size_t idx) { return m_data + idx; } size_t get_memory_utilization() { - return m_cap * sizeof(Record); + return m_cap * sizeof(R); } size_t get_aux_memory_utilization() { @@ -185,17 +154,18 @@ public: // // NOTE: This operation samples from records strictly between the upper and // lower bounds, not including them - double get_sample_range(const K& lower, const K& upper, std::vector *> &records, - Alias **alias, size_t *cutoff) { + template + double get_sample_range(const decltype(R_::key) lower, const decltype(R_::key)& upper, + std::vector &records, Alias **alias, size_t *cutoff) { std::vector weights; *cutoff = std::atomic_load(&m_reccnt) - 1; records.clear(); double tot_weight = 0.0; for (size_t i = 0; i < (*cutoff) + 1; i++) { - Record *rec = m_data + i; + R *rec = m_data + i; - if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->get_delete_status()) { + if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->is_deleted()) { weights.push_back(rec->weight); records.push_back(rec); tot_weight += rec->weight; @@ -212,7 +182,8 @@ public: } // rejection sampling - const Record *get_sample(const K& lower, const K& upper, gsl_rng *rng) { + template + const R *get_sample(const decltype(R_::key)& lower, const decltype(R_::key)& upper, gsl_rng *rng) { size_t reccnt = m_reccnt.load(); if (reccnt == 0) { return nullptr; @@ -230,7 +201,7 @@ public: if (test <= rec->weight && rec->key >= lower && rec->key <= upper && - !rec->is_tombstone() && !rec->get_delete_status()) { + !rec->is_tombstone() && !rec->is_deleted()) { return rec; } @@ -259,7 +230,7 @@ private: size_t m_tombstone_cap; //char* m_data; - Record* m_data; + R* m_data; BloomFilter* m_tombstone_filter; alignas(64) std::atomic m_tombstonecnt; diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h index 6d97f95..dd2fd85 100644 --- a/include/shard/MemISAM.h +++ b/include/shard/MemISAM.h @@ -23,10 +23,11 @@ namespace de { thread_local size_t mrun_cancelations = 0; -template +template class MemISAM { private: -typedef Record Rec; +typedef decltype(R::key) K; +typedef decltype(R::value) V; constexpr static size_t inmem_isam_node_size = 256; constexpr static size_t inmem_isam_fanout = inmem_isam_node_size / (sizeof(K) + sizeof(char*)); @@ -36,24 +37,23 @@ struct InMemISAMNode { char* child[inmem_isam_fanout]; }; -constexpr static size_t inmem_isam_leaf_fanout = inmem_isam_node_size / sizeof(Rec); +constexpr static size_t inmem_isam_leaf_fanout = inmem_isam_node_size / sizeof(R); constexpr static size_t inmem_isam_node_keyskip = sizeof(K) * inmem_isam_fanout; static_assert(sizeof(InMemISAMNode) == inmem_isam_node_size, "node size does not match"); - public: MemISAM(std::string data_fname, size_t record_cnt, size_t tombstone_cnt, BloomFilter *bf, bool tagging) : m_reccnt(record_cnt), m_tombstone_cnt(tombstone_cnt), m_deleted_cnt(0), m_tagging(tagging) { // read the stored data file the file - size_t alloc_size = (record_cnt * sizeof(Rec)) + (CACHELINE_SIZE - (record_cnt * sizeof(Rec)) % CACHELINE_SIZE); + size_t alloc_size = (record_cnt * sizeof(R)) + (CACHELINE_SIZE - (record_cnt * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, alloc_size); FILE *file = fopen(data_fname.c_str(), "rb"); assert(file); - auto res = fread(m_data, sizeof(Rec), m_reccnt, file); + auto res = fread(m_data, sizeof(R), m_reccnt, file); assert (res == m_reccnt); fclose(file); @@ -71,34 +71,34 @@ public: } } - MemISAM(MutableBuffer* buffer, BloomFilter* bf, bool tagging) + MemISAM(MutableBuffer* buffer, BloomFilter* bf, bool tagging) :m_reccnt(0), m_tombstone_cnt(0), m_isam_nodes(nullptr), m_deleted_cnt(0), m_tagging(tagging) { - size_t alloc_size = (buffer->get_record_count() * sizeof(Rec)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Rec)) % CACHELINE_SIZE); + size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); TIMER_INIT(); size_t offset = 0; m_reccnt = 0; TIMER_START(); - Rec* base = buffer->sorted_output(); + R* base = buffer->sorted_output(); TIMER_STOP(); auto sort_time = TIMER_RESULT(); - Rec* stop = base + buffer->get_record_count(); + R* stop = base + buffer->get_record_count(); TIMER_START(); while (base < stop) { if (!m_tagging) { if (!base->is_tombstone() && (base + 1 < stop) - && base->match(base + 1) && (base + 1)->is_tombstone()) { + && *base == *(base + 1) && (base + 1)->is_tombstone()) { base += 2; mrun_cancelations++; continue; } - } else if (base->get_delete_status()) { + } else if (base->is_deleted()) { base += 1; continue; } @@ -123,15 +123,15 @@ public: TIMER_STOP(); auto level_time = TIMER_RESULT(); - fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time); + //fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time); } MemISAM(MemISAM** runs, size_t len, BloomFilter* bf, bool tagging) :m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_isam_nodes(nullptr), m_tagging(tagging) { - std::vector> cursors; + std::vector> cursors; cursors.reserve(len); - PriorityQueue pq(len); + PriorityQueue pq(len); size_t attemp_reccnt = 0; @@ -142,21 +142,21 @@ public: attemp_reccnt += runs[i]->get_record_count(); pq.push(cursors[i].ptr, i); } else { - cursors.emplace_back(Cursor{nullptr, nullptr, 0, 0}); + cursors.emplace_back(Cursor{nullptr, nullptr, 0, 0}); } } - size_t alloc_size = (attemp_reccnt * sizeof(Rec)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Rec)) % CACHELINE_SIZE); + size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); size_t offset = 0; while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) : queue_record{nullptr, 0}; if (!m_tagging && !now.data->is_tombstone() && next.data != nullptr && - now.data->match(next.data) && next.data->is_tombstone()) { + *now.data == *next.data && next.data->is_tombstone()) { pq.pop(); pq.pop(); auto& cursor1 = cursors[now.version]; @@ -165,7 +165,7 @@ public: if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto& cursor = cursors[now.version]; - if (!m_tagging || !cursor.ptr->get_delete_status()) { + if (!m_tagging || !cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; if (cursor.ptr->is_tombstone()) { ++m_tombstone_cnt; @@ -188,7 +188,7 @@ public: if (m_isam_nodes) free(m_isam_nodes); } - Rec* sorted_output() const { + R* sorted_output() const { return m_data; } @@ -208,7 +208,7 @@ public: while (idx < m_reccnt && m_data[idx].lt(key, val)) ++idx; - if (m_data[idx].match(key, val, false)) { + if (m_data[idx] == R {key, val}) { m_data[idx].set_delete_status(); m_deleted_cnt++; return true; @@ -217,7 +217,7 @@ public: return false; } - const Rec* get_record_at(size_t idx) const { + const R* get_record_at(size_t idx) const { return (idx < m_reccnt) ? m_data + idx : nullptr; } @@ -235,7 +235,7 @@ public: now = next ? next : reinterpret_cast(now->child[inmem_isam_fanout - 1]); } - const Rec* pos = reinterpret_cast(now); + const R* pos = reinterpret_cast(now); while (pos < m_data + m_reccnt && pos->key < key) pos++; return pos - m_data; @@ -255,7 +255,7 @@ public: now = next ? next : reinterpret_cast(now->child[inmem_isam_fanout - 1]); } - const Rec* pos = reinterpret_cast(now); + const R* pos = reinterpret_cast(now); while (pos < m_data + m_reccnt && pos->key <= key) pos++; return pos - m_data; @@ -267,20 +267,20 @@ public: return false; } - Rec* ptr = m_data + idx; + R* ptr = m_data + idx; while (ptr < m_data + m_reccnt && ptr->lt(key, val)) ptr++; - return ptr->match(key, val, true); + return *ptr == R {key, val} && ptr->is_tombstone(); } size_t get_memory_utilization() { - return m_reccnt * sizeof(Rec) + m_internal_node_cnt * inmem_isam_node_size; + return m_reccnt * sizeof(R) + m_internal_node_cnt * inmem_isam_node_size; } void persist_to_file(std::string data_fname) { FILE *file = fopen(data_fname.c_str(), "wb"); assert(file); - fwrite(m_data, sizeof(Rec), m_reccnt, file); + fwrite(m_data, sizeof(R), m_reccnt, file); fclose(file); } @@ -303,14 +303,14 @@ private: InMemISAMNode* current_node = m_isam_nodes; - const Rec* leaf_base = m_data; - const Rec* leaf_stop = m_data + m_reccnt; + const R* leaf_base = m_data; + const R* leaf_stop = m_data + m_reccnt; while (leaf_base < leaf_stop) { size_t fanout = 0; for (size_t i = 0; i < inmem_isam_fanout; ++i) { auto rec_ptr = leaf_base + inmem_isam_leaf_fanout * i; if (rec_ptr >= leaf_stop) break; - const Rec* sep_key = std::min(rec_ptr + inmem_isam_leaf_fanout - 1, leaf_stop - 1); + const R* sep_key = std::min(rec_ptr + inmem_isam_leaf_fanout - 1, leaf_stop - 1); current_node->keys[i] = sep_key->key; current_node->child[i] = (char*)rec_ptr; ++fanout; @@ -350,7 +350,7 @@ private: } // Members: sorted data, internal ISAM levels, reccnt; - Rec* m_data; + R* m_data; InMemISAMNode* m_isam_nodes; InMemISAMNode* m_root; size_t m_reccnt; diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h index 39337bf..41766b9 100644 --- a/include/shard/WIRS.h +++ b/include/shard/WIRS.h @@ -12,6 +12,7 @@ #include #include #include +#include #include "ds/PriorityQueue.h" #include "util/Cursor.h" @@ -24,19 +25,26 @@ namespace de { thread_local size_t wirs_cancelations = 0; -template +template class WIRS { private: + + typedef decltype(R::key) K; + typedef decltype(R::value) V; + typedef decltype(R::weight) W; + + template struct wirs_node { - struct wirs_node *left, *right; + struct wirs_node *left, *right; K low, high; W weight; Alias* alias; }; + template struct WIRSState { W tot_weight; - std::vector nodes; + std::vector*> nodes; Alias* top_level_alias; ~WIRSState() { @@ -45,13 +53,13 @@ private: }; public: - WIRS(MutableBuffer* buffer, BloomFilter* bf, bool tagging) + WIRS(MutableBuffer* buffer, BloomFilter* bf, bool tagging) : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0), m_tagging(tagging), m_root(nullptr) { - size_t alloc_size = (buffer->get_record_count() * sizeof(Record)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Record)) % CACHELINE_SIZE); + size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Record*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); size_t offset = 0; m_reccnt = 0; @@ -61,13 +69,13 @@ public: while (base < stop) { if (!m_tagging) { if (!(base->is_tombstone()) && (base + 1) < stop) { - if (base->match(base + 1) && (base + 1)->is_tombstone()) { + if (*base == *(base + 1) && (base + 1)->is_tombstone()) { base += 2; wirs_cancelations++; continue; } } - } else if (base->get_delete_status()) { + } else if (base->is_deleted()) { base += 1; continue; } @@ -92,10 +100,10 @@ public: WIRS(WIRS** shards, size_t len, BloomFilter* bf, bool tagging) : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0), m_tagging(tagging), m_root(nullptr) { - std::vector> cursors; + std::vector> cursors; cursors.reserve(len); - PriorityQueue pq(len); + PriorityQueue pq(len); size_t attemp_reccnt = 0; @@ -106,28 +114,28 @@ public: attemp_reccnt += shards[i]->get_record_count(); pq.push(cursors[i].ptr, i); } else { - cursors.emplace_back(Cursor{nullptr, nullptr, 0, 0}); + cursors.emplace_back(Cursor{nullptr, nullptr, 0, 0}); } } - size_t alloc_size = (attemp_reccnt * sizeof(Record)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Record)) % CACHELINE_SIZE); + size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Record*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) : queue_record{nullptr, 0}; if (!m_tagging && !now.data->is_tombstone() && next.data != nullptr && - now.data->match(next.data) && next.data->is_tombstone()) { + *now.data == *next.data && next.data->is_tombstone()) { pq.pop(); pq.pop(); auto& cursor1 = cursors[now.version]; auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto& cursor = cursors[now.version]; - if (!m_tagging || !cursor.ptr->get_delete_status()) { + if (!m_tagging || !cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; m_total_weight += cursor.ptr->weight; if (bf && cursor.ptr->is_tombstone()) { @@ -137,7 +145,7 @@ public: } pq.pop(); - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); } } @@ -155,16 +163,16 @@ public: free_tree(m_root); } - bool delete_record(const K& key, const V& val) { - size_t idx = get_lower_bound(key); + bool delete_record(const R& rec) { + size_t idx = get_lower_bound(rec.key); if (idx >= m_reccnt) { return false; } - while (idx < m_reccnt && m_data[idx].lt(key, val)) ++idx; + while (idx < m_reccnt && m_data[idx] < rec) ++idx; - if (m_data[idx].match(key, val, false)) { - m_data[idx].set_delete_status(); + if (m_data[idx] == R {rec.key, rec.val} && !m_data[idx].is_tombstone()) { + m_data[idx].set_delete(); m_deleted_cnt++; return true; } @@ -172,7 +180,7 @@ public: return false; } - void free_tree(struct wirs_node* node) { + void free_tree(struct wirs_node* node) { if (node) { delete node->alias; free_tree(node->left); @@ -181,7 +189,7 @@ public: } } - Record* sorted_output() const { + R* sorted_output() const { return m_data; } @@ -193,19 +201,19 @@ public: return m_tombstone_cnt; } - const Record* get_record_at(size_t idx) const { + const R* get_record_at(size_t idx) const { if (idx >= m_reccnt) return nullptr; return m_data + idx; } // low - high -> decompose to a set of nodes. // Build Alias across the decomposed nodes. - WIRSState* get_sample_shard_state(const K& lower_key, const K& upper_key) { - WIRSState* res = new WIRSState(); + WIRSState* get_sample_shard_state(const K& lower_key, const K& upper_key) { + auto res = new WIRSState(); // Simulate a stack to unfold recursion. double tot_weight = 0.0; - struct wirs_node* st[64] = {0}; + struct wirs_node* st[64] = {0}; st[0] = m_root; size_t top = 1; while(top > 0) { @@ -231,15 +239,15 @@ public: } static void delete_state(void *state) { - auto s = (WIRSState *) state; + WIRSState *s = (WIRSState *) state; delete s; } // returns the number of records sampled // NOTE: This operation returns records strictly between the lower and upper bounds, not // including them. - size_t get_samples(void* shard_state, std::vector> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { - WIRSState *state = (WIRSState *) shard_state; + size_t get_samples(void* shard_state, std::vector &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { + WIRSState *state = (WIRSState *) shard_state; if (sample_sz == 0) { return 0; } @@ -295,30 +303,30 @@ public: auto ptr = m_data + get_lower_bound(key); - while (ptr < m_data + m_reccnt && ptr->lt(key, val)) { + while (ptr < m_data + m_reccnt && *ptr < R {key, val}) { ptr ++; } - bool result = (m_tagging) ? ptr->get_delete_status() - : ptr->match(key, val, true); + bool result = (m_tagging) ? ptr->is_deleted() + : *ptr == R {key, val} && ptr->is_tombstone(); m_rejection_cnt += result; return result; } - bool check_tombstone(const K& key, const V& val) { + bool check_tombstone(const R& rec) { m_ts_check_cnt++; - size_t idx = get_lower_bound(key); + size_t idx = get_lower_bound(rec.key); if (idx >= m_reccnt) { return false; } - auto ptr = m_data + get_lower_bound(key); + auto ptr = m_data + get_lower_bound(rec.key); - while (ptr < m_data + m_reccnt && ptr->lt(key, val)) { + while (ptr < m_data + m_reccnt && *ptr < rec) { ptr ++; } - bool result = ptr->match(key, val, true); + bool result = *ptr == rec && ptr->is_tombstone(); m_rejection_cnt += result; return result; @@ -340,21 +348,21 @@ public: private: - bool covered_by(struct wirs_node* node, const K& lower_key, const K& upper_key) { + bool covered_by(struct wirs_node* node, const K& lower_key, const K& upper_key) { auto low_index = node->low * m_group_size; auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1); return lower_key < m_data[low_index].key && m_data[high_index].key < upper_key; } - bool intersects(struct wirs_node* node, const K& lower_key, const K& upper_key) { + bool intersects(struct wirs_node* node, const K& lower_key, const K& upper_key) { auto low_index = node->low * m_group_size; auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1); return lower_key < m_data[high_index].key && m_data[low_index].key < upper_key; } - struct wirs_node* construct_wirs_node(const std::vector& weights, size_t low, size_t high) { + struct wirs_node* construct_wirs_node(const std::vector& weights, size_t low, size_t high) { if (low == high) { - return new wirs_node{nullptr, nullptr, low, high, weights[low], new Alias({1.0})}; + return new wirs_node{nullptr, nullptr, low, high, weights[low], new Alias({1.0})}; } else if (low > high) return nullptr; std::vector node_weights; @@ -370,9 +378,9 @@ private: size_t mid = (low + high) / 2; - return new wirs_node{construct_wirs_node(weights, low, mid), - construct_wirs_node(weights, mid + 1, high), - low, high, sum, new Alias(node_weights)}; + return new wirs_node{construct_wirs_node(weights, low, mid), + construct_wirs_node(weights, mid + 1, high), + low, high, sum, new Alias(node_weights)}; } @@ -410,9 +418,9 @@ private: m_root = construct_wirs_node(weights, 0, n_groups-1); } - Record* m_data; + R* m_data; std::vector m_alias; - wirs_node* m_root; + wirs_node* m_root; bool m_tagging; W m_total_weight; size_t m_reccnt; diff --git a/include/util/Cursor.h b/include/util/Cursor.h index 2339800..2609ae5 100644 --- a/include/util/Cursor.h +++ b/include/util/Cursor.h @@ -14,10 +14,10 @@ #include "io/PagedFile.h" namespace de { -template +template struct Cursor { - Record *ptr; - const Record *end; + R *ptr; + R *end; size_t cur_rec_idx; size_t rec_cnt; @@ -36,8 +36,8 @@ struct Cursor { * be updated to be equal to end, and false will be returned. Iterators will * not be closed. */ -template -inline static bool advance_cursor(Cursor &cur, PagedFileIterator *iter = nullptr) { +template +inline static bool advance_cursor(Cursor &cur, PagedFileIterator *iter = nullptr) { cur.ptr++; cur.cur_rec_idx++; @@ -45,8 +45,8 @@ inline static bool advance_cursor(Cursor &cur, PagedFileIterator *iter = if (cur.ptr >= cur.end) { if (iter && iter->next()) { - cur.ptr = (Record*)iter->get_item(); - cur.end = cur.ptr + (PAGE_SIZE / sizeof(Record)); + cur.ptr = (R*)iter->get_item(); + cur.end = cur.ptr + (PAGE_SIZE / sizeof(R)); return true; } @@ -62,14 +62,14 @@ inline static bool advance_cursor(Cursor &cur, PagedFileIterator *iter = * This allows for "peaking" at the next largest element after the current * largest is processed. */ -template -inline static Cursor *get_next(std::vector> &cursors, Cursor *current=nullptr) { - const Record *min_rec = nullptr; - Cursor *result = nullptr; +template +inline static Cursor *get_next(std::vector> &cursors, Cursor *current=nullptr) { + const R *min_rec = nullptr; + Cursor *result = nullptr; for (size_t i=0; i< cursors.size(); i++) { - if (cursors[i] == (Cursor) {0} ) continue; + if (cursors[i] == (Cursor) {0} ) continue; - const Record *rec = (&cursors[i] == current) ? cursors[i].ptr + 1 : cursors[i].ptr; + const R *rec = (&cursors[i] == current) ? cursors[i].ptr + 1 : cursors[i].ptr; if (rec >= cursors[i].end) continue; if (min_rec == nullptr) { diff --git a/include/util/Record.h b/include/util/Record.h index 687e745..ce101f4 100644 --- a/include/util/Record.h +++ b/include/util/Record.h @@ -10,50 +10,110 @@ #pragma once #include +#include #include "util/base.h" namespace de { -template +template +concept RecordInterface = requires(R r, R s) { + r.key; + r.value; + r.header; + + {r.is_tombstone()} -> std::convertible_to; + {r.is_deleted()} -> std::convertible_to; + r.set_delete(); + r.set_tombstone(std::declval); + { r < s } ->std::convertible_to; + { r == s } ->std::convertible_to; + { r.header < s.header } -> std::convertible_to; +}; + +template +concept WeightedRecordInterface = RecordInterface && requires(R r) { + {r.weight} -> std::convertible_to; +}; + +template struct Record { K key; V value; - typename std::conditional::value, W, std::false_type>::type weight; - uint32_t header; + uint32_t header = 0; + + inline void set_delete() { + header |= 2; + } + + inline bool is_deleted() const { + return header & 2; + } + + inline void set_tombstone(bool val=true) { + if (val) { + header |= val; + } else { + header &= 0; + } + } + + inline bool is_tombstone() const { + return header & 1; + } + + inline bool operator<(const Record& other) const { + return key < other.key || (key == other.key && value < other.value); + } - inline bool match(K k, V v, bool is_tombstone) const { - return (key == k) && (value == v) && ((header & 1) == is_tombstone); + inline bool operator==(const Record& other) const { + return key == other.key && value == other.value; } +}; + +template +struct WeightedRecord { + K key; + V value; + W weight = 1; + uint32_t header = 0; - inline void set_delete_status() { + inline void set_delete() { header |= 2; } - inline bool get_delete_status() const { + inline bool is_deleted() const { return header & 2; } + inline void set_tombstone(bool val=true) { + if (val) { + header |= val; + } else { + header &= 0; + } + } + inline bool is_tombstone() const { return header & 1; } - inline int match(const Record* other) const { + inline int match(const WeightedRecord* other) const { return key == other->key && value == other->value; } - inline bool operator<(const Record& other) const { + inline bool operator<(const WeightedRecord& other) const { return key < other.key || (key == other.key && value < other.value); } - inline bool lt(const K& k, const V& v) const { - return key < k || (key == k && value < v); + inline bool operator==(const WeightedRecord& other) const { + return key == other.key && value == other.value; } }; -template -static bool memtable_record_cmp(const Record& a, const Record& b) { +template +static bool memtable_record_cmp(const R& a, const R& b) { return (a.key < b.key) || (a.key == b.key && a.value < b.value) || (a.key == b.key && a.value == b.value && a.header < b.header); } -- cgit v1.2.3