diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-15 16:48:56 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-15 16:48:56 -0400 |
| commit | ff000799c3254f52e0beabbe9c62d10c3fc4178e (patch) | |
| tree | 49a1a045678315e8e215fd80409973679b793043 | |
| parent | 418e9b079e559c86f3a5b276f712ad2f5d66533c (diff) | |
| download | dynamic-extension-ff000799c3254f52e0beabbe9c62d10c3fc4178e.tar.gz | |
Record format generalization
Currently, tombstone counting is bugged. But the rest of it appears to
be working.
| -rw-r--r-- | CMakeLists.txt | 4 | ||||
| -rw-r--r-- | include/ds/PriorityQueue.h | 14 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 79 | ||||
| -rw-r--r-- | include/framework/InternalLevel.h | 39 | ||||
| -rw-r--r-- | include/framework/MutableBuffer.h | 109 | ||||
| -rw-r--r-- | include/shard/MemISAM.h | 72 | ||||
| -rw-r--r-- | include/shard/WIRS.h | 108 | ||||
| -rw-r--r-- | include/util/Cursor.h | 26 | ||||
| -rw-r--r-- | include/util/Record.h | 86 | ||||
| -rw-r--r-- | tests/dynamic_extension_tests.cpp | 79 | ||||
| -rw-r--r-- | tests/internal_level_tests.cpp | 18 | ||||
| -rw-r--r-- | tests/memisam_tests.cpp | 41 | ||||
| -rw-r--r-- | tests/mutable_buffer_tests.cpp | 107 | ||||
| -rw-r--r-- | tests/testing.h | 99 | ||||
| -rw-r--r-- | tests/wirs_tests.cpp | 44 |
15 files changed, 502 insertions, 423 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 3f551a3..7be4085 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -48,12 +48,10 @@ if (tests) add_executable(dynamic_extension_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/dynamic_extension_tests.cpp) target_link_libraries(dynamic_extension_tests PUBLIC gsl check subunit pthread) target_include_directories(dynamic_extension_tests PRIVATE include) - + add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp) target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread) target_include_directories(memisam_tests PRIVATE include) - - endif() # Benchmark build instructions diff --git a/include/ds/PriorityQueue.h b/include/ds/PriorityQueue.h index 290d5c8..22582da 100644 --- a/include/ds/PriorityQueue.h +++ b/include/ds/PriorityQueue.h @@ -16,14 +16,14 @@ namespace de { -template <typename K, typename V, typename W=void> +template <typename R> struct queue_record { - const Record<K, V, W> *data; + const R *data; size_t version; }; -template <typename K, typename V, typename W=void> +template <typename R> class PriorityQueue { public: PriorityQueue(size_t size) : data(size), tail(0) {} @@ -54,7 +54,7 @@ public: } } - void push(const Record<K, V, W>* record, size_t version=0) { + void push(const R* record, size_t version=0) { assert(tail != this->data.size()); size_t new_idx = this->tail++; @@ -67,7 +67,7 @@ public: } - queue_record<K, V, W> peek(size_t depth=0) { + queue_record<R> peek(size_t depth=0) { ssize_t idx = 0; size_t cur_depth = 0; @@ -81,7 +81,7 @@ public: } private: - std::vector<queue_record<K, V, W>> data; + std::vector<queue_record<R>> data; size_t tail; /* @@ -124,7 +124,7 @@ private: } inline bool heap_cmp(size_t a, size_t b) { - if (!data[a].data->match(data[b].data)) { + if (data[a].data != data[b].data) { return *(data[a].data) < *(data[b].data); } else if (data[a].version != data[b].version) return data[a].version < data[b].version; diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index c65290a..1d9ee76 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -68,11 +68,12 @@ enum class DeletePolicy { typedef ssize_t level_index; -template <typename K, typename V, typename W=void> +template <RecordInterface R> class DynamicExtension { - typedef WIRS<K,V,W> Shard; - typedef MutableBuffer<K,V,W> MBuffer; - typedef Record<K, V, W> Rec; + typedef WIRS<R> Shard; + typedef decltype(R::key) K; + typedef decltype(R::value) V; + typedef decltype(R::weight) W; public: DynamicExtension(size_t buffer_cap, size_t buffer_delete_cap, size_t scale_factor, @@ -82,8 +83,8 @@ public: m_max_delete_prop(max_delete_prop), m_max_rejection_rate(max_rejection_prop), m_last_level_idx(-1), - m_buffer_1(new MutableBuffer<K,V,W>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), - m_buffer_2(new MutableBuffer<K,V,W>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), + m_buffer_1(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), + m_buffer_2(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), m_buffer_1_merging(false), m_buffer_2_merging(false) {} ~DynamicExtension() { @@ -113,9 +114,9 @@ public: return buffer->delete_record(key, val); } - int append(const K& key, const V& val, W weight, bool tombstone, gsl_rng *rng) { + int append(R &rec, gsl_rng *rng) { // NOTE: single-threaded implementation only - MutableBuffer<K,V,W> *buffer; + MutableBuffer<R> *buffer; while (!(buffer = get_buffer())) ; @@ -123,13 +124,13 @@ public: merge_buffer(rng); } - return buffer->append(key, val, weight, tombstone); + return buffer->append(rec); } - void range_sample(Record<K,V,W> *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { + void range_sample(R *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { auto buffer = get_buffer(); Alias *buffer_alias = nullptr; - std::vector<Record<K,V,W> *> buffer_records; + std::vector<R *> buffer_records; size_t buffer_cutoff = 0; W buffer_weight; @@ -185,7 +186,7 @@ public: rejections = 0; while (shard_samples[0] > 0) { - const Record<K,V,W> *rec; + const R *rec; if (LSM_REJ_SAMPLE) { rec = buffer->get_sample(lower_key, upper_key, rng); } else { @@ -193,13 +194,13 @@ public: } if (DELETE_TAGGING) { - if (rec && !rec->get_delete_status()) { + if (rec && !rec->is_deleted()) { sample_set[sample_idx++] = *rec; } else { rejections++; } } else { - if (rec && !buffer->check_tombstone(rec->key, rec->value)) { + if (rec && !buffer->check_tombstone(*rec)) { sample_set[sample_idx++] = *rec; } else { rejections++; @@ -220,14 +221,14 @@ public: } } - std::vector<Rec> results; + std::vector<R> results; for (size_t i=1; i<shard_samples.size(); i++) { results.reserve(shard_samples[i]); shards[i].second->get_samples(states[i], results, lower_key, upper_key, shard_samples[i], rng); for (size_t j=0; j<results.size(); j++) { - if (rejection(&results[j], shards[i].first, lower_key, upper_key, buffer, buffer_cutoff)) { + if (rejection(results[j], shards[i].first, lower_key, upper_key, buffer, buffer_cutoff)) { rejections++; continue; } @@ -252,16 +253,16 @@ public: // should correspond to the shard containing the record in question // // Passing INVALID_SHID indicates that the record exists within the buffer - bool is_deleted(const Record<K,V,W> *record, const ShardID &shid, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) { + bool is_deleted(const R &record, const ShardID &shid, MutableBuffer<R> *buffer, size_t buffer_cutoff) { // If tagging is enabled, we just need to check if the record has the delete tag set if (DELETE_TAGGING) { - return record->get_delete_status(); + return record.is_deleted(); } // Otherwise, we need to look for a tombstone. // check for tombstone in the buffer. This will require accounting for the cutoff eventually. - if (buffer->check_tombstone(record->key, record->value)) { + if (buffer->check_tombstone(record)) { return true; } @@ -271,13 +272,13 @@ public: } for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { - if (m_levels[lvl]->check_tombstone(0, record->key, record->value)) { + if (m_levels[lvl]->check_tombstone(0, record)) { return true; } } // check the level containing the shard - return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record->key, record->value); + return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record); } @@ -379,8 +380,8 @@ public: } private: - MutableBuffer<K,V,W> *m_buffer_1; - MutableBuffer<K,V,W> *m_buffer_2; + MutableBuffer<R> *m_buffer_1; + MutableBuffer<R> *m_buffer_2; std::atomic<bool> m_active_buffer; std::atomic<bool> m_buffer_1_merging; std::atomic<bool> m_buffer_2_merging; @@ -389,11 +390,11 @@ private: double m_max_delete_prop; double m_max_rejection_rate; - std::vector<InternalLevel<K,V,W> *> m_levels; + std::vector<InternalLevel<R> *> m_levels; level_index m_last_level_idx; - MutableBuffer<K,V,W> *get_buffer() { + MutableBuffer<R> *get_buffer() { if (m_buffer_1_merging && m_buffer_2_merging) { return nullptr; } @@ -401,11 +402,11 @@ private: return (m_active_buffer) ? m_buffer_2 : m_buffer_1; } - inline bool rejection(const Record<K,V,W> *record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) { - if (record->is_tombstone()) { + inline bool rejection(const R &record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer<R> *buffer, size_t buffer_cutoff) { + if (record.is_tombstone()) { tombstone_rejections++; return true; - } else if (record->key < lower_bound || record->key > upper_bound) { + } else if (record.key < lower_bound || record.key > upper_bound) { bounds_rejections++; return true; } else if (is_deleted(record, shid, buffer, buffer_cutoff)) { @@ -416,8 +417,8 @@ private: return false; } - inline bool add_to_sample(const Record<K,V,W> *record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer, - Record<K,V,W> *sample_buffer, size_t &sample_idx, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) { + inline bool add_to_sample(const R &record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer, + R *sample_buffer, size_t &sample_idx, MutableBuffer<R> *buffer, size_t buffer_cutoff) { TIMER_INIT(); TIMER_START(); sampling_attempts++; @@ -445,7 +446,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel<K,V,W>(new_idx, new_shard_cnt, DELETE_TAGGING)); + m_levels.emplace_back(new InternalLevel<R>(new_idx, new_shard_cnt, DELETE_TAGGING)); m_last_level_idx++; return new_idx; @@ -495,7 +496,7 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first merge. */ - inline level_index find_mergable_level(level_index idx, MutableBuffer<K,V,W> *buffer=nullptr) { + inline level_index find_mergable_level(level_index idx, MutableBuffer<R> *buffer=nullptr) { if (idx == 0 && m_levels.size() == 0) return -1; @@ -525,7 +526,7 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<K,V,W>::merge_levels(m_levels[base_level], m_levels[incoming_level], + m_levels[base_level] = InternalLevel<R>::merge_levels(m_levels[base_level], m_levels[incoming_level], DELETE_TAGGING, rng); mark_as_unused(tmp); } else { @@ -533,18 +534,18 @@ private: } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel<K,V,W>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING); + m_levels[incoming_level] = new InternalLevel<R>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING); } - inline void merge_buffer_into_l0(MutableBuffer<K,V,W> *buffer, gsl_rng *rng) { + inline void merge_buffer_into_l0(MutableBuffer<R> *buffer, gsl_rng *rng) { assert(m_levels[0]); if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel<K,V,W>(0, 1, DELETE_TAGGING); + auto temp_level = new InternalLevel<R>(0, 1, DELETE_TAGGING); temp_level->append_mem_table(buffer, rng); - auto new_level = InternalLevel<K,V,W>::merge_levels(old_level, temp_level, DELETE_TAGGING, rng); + auto new_level = InternalLevel<R>::merge_levels(old_level, temp_level, DELETE_TAGGING, rng); m_levels[0] = new_level; delete temp_level; @@ -560,7 +561,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel<K,V,W> *level) { + inline void mark_as_unused(InternalLevel<R> *level) { delete level; } @@ -608,7 +609,7 @@ private: * a pointer to the memory table to use, if desired. Otherwise, there are * no guarantees about which buffer will be accessed if level_index is -1. */ - inline size_t get_level_record_count(level_index idx, MutableBuffer<K,V,W> *buffer=nullptr) { + inline size_t get_level_record_count(level_index idx, MutableBuffer<R> *buffer=nullptr) { assert(idx >= -1); if (idx == -1) { return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count(); diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index 727a382..f0f19da 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -19,16 +19,19 @@ namespace de { -template <typename K, typename V, typename W=void> +template <RecordInterface R> class InternalLevel { static const size_t REJECTION_TRIGGER_THRESHOLD = 1024; + typedef decltype(R::key) K; + typedef decltype(R::value) V; + private: struct InternalLevelStructure { InternalLevelStructure(size_t cap) : m_cap(cap) - , m_shards(new WIRS<K, V, W>*[cap]{nullptr}) + , m_shards(new WIRS<R>*[cap]{nullptr}) , m_bfs(new BloomFilter*[cap]{nullptr}) {} ~InternalLevelStructure() { @@ -42,7 +45,7 @@ private: } size_t m_cap; - WIRS<K, V, W>** m_shards; + WIRS<R>** m_shards; BloomFilter** m_bfs; }; @@ -74,40 +77,40 @@ public: new BloomFilter(BF_FPR, new_level->get_tombstone_count() + base_level->get_tombstone_count(), BF_HASH_FUNCS, rng); - WIRS<K, V, W>* shards[2]; + WIRS<R>* shards[2]; shards[0] = base_level->m_structure->m_shards[0]; shards[1] = new_level->m_structure->m_shards[0]; - res->m_structure->m_shards[0] = new WIRS<K, V, W>(shards, 2, res->m_structure->m_bfs[0], tagging); + res->m_structure->m_shards[0] = new WIRS<R>(shards, 2, res->m_structure->m_bfs[0], tagging); return res; } - void append_mem_table(MutableBuffer<K,V,W>* buffer, const gsl_rng* rng) { + void append_mem_table(MutableBuffer<R>* buffer, const gsl_rng* rng) { assert(m_shard_cnt < m_structure->m_cap); m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng); - m_structure->m_shards[m_shard_cnt] = new WIRS<K, V, W>(buffer, m_structure->m_bfs[m_shard_cnt], m_tagging); + m_structure->m_shards[m_shard_cnt] = new WIRS<R>(buffer, m_structure->m_bfs[m_shard_cnt], m_tagging); ++m_shard_cnt; } void append_merged_shards(InternalLevel* level, const gsl_rng* rng) { assert(m_shard_cnt < m_structure->m_cap); m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng); - m_structure->m_shards[m_shard_cnt] = new WIRS<K, V, W>(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt], m_tagging); + m_structure->m_shards[m_shard_cnt] = new WIRS<R>(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt], m_tagging); ++m_shard_cnt; } - WIRS<K, V, W> *get_merged_shard() { - WIRS<K, V, W> *shards[m_shard_cnt]; + WIRS<R> *get_merged_shard() { + WIRS<R> *shards[m_shard_cnt]; for (size_t i=0; i<m_shard_cnt; i++) { shards[i] = (m_structure->m_shards[i]) ? m_structure->m_shards[i] : nullptr; } - return new WIRS<K, V, W>(shards, m_shard_cnt, nullptr, m_tagging); + return new WIRS<R>(shards, m_shard_cnt, nullptr, m_tagging); } // Append the sample range in-order..... - void get_shard_weights(std::vector<W>& weights, std::vector<std::pair<ShardID, WIRS<K, V, W> *>> &shards, std::vector<void*>& shard_states, const K& low, const K& high) { + void get_shard_weights(std::vector<uint64_t>& weights, std::vector<std::pair<ShardID, WIRS<R> *>> &shards, std::vector<void*>& shard_states, const K& low, const K& high) { for (size_t i=0; i<m_shard_cnt; i++) { if (m_structure->m_shards[i]) { auto shard_state = m_structure->m_shards[i]->get_sample_shard_state(low, high); @@ -116,7 +119,7 @@ public: weights.push_back(shard_state->tot_weight); shard_states.emplace_back(shard_state); } else { - WIRS<K, V, W>::delete_state(shard_state); + WIRS<R>::delete_state(shard_state); } } } @@ -130,12 +133,12 @@ public: return false; } - bool check_tombstone(size_t shard_stop, const K& key, const V& val) { + bool check_tombstone(size_t shard_stop, const R& rec) { if (m_shard_cnt == 0) return false; for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) { - if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(key)) - && m_structure->m_shards[i]->check_tombstone(key, val)) + if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(rec.key)) + && m_structure->m_shards[i]->check_tombstone(rec)) return true; } return false; @@ -151,11 +154,11 @@ public: return false; } - const Record<K, V, W>* get_record_at(size_t shard_no, size_t idx) { + const R* get_record_at(size_t shard_no, size_t idx) { return m_structure->m_shards[shard_no]->get_record_at(idx); } - WIRS<K, V, W>* get_shard(size_t idx) { + WIRS<R>* get_shard(size_t idx) { return m_structure->m_shards[idx]; } diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index 42bc9a7..74838b8 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -26,15 +26,15 @@ namespace de { -template <typename K, typename V, typename W=void> +template <RecordInterface R> class MutableBuffer { public: MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap, const gsl_rng* rng) : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { - auto len = capacity * sizeof(Record<K, V, W>); + auto len = capacity * sizeof(R); size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE)); - m_data = (Record<K, V, W>*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); + m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); m_tombstone_filter = nullptr; if (max_tombstone_cap > 0) { assert(rng != nullptr); @@ -47,68 +47,35 @@ public: if (m_tombstone_filter) delete m_tombstone_filter; } - template <typename W_=W, - typename =std::enable_if_t<std::is_same<W_, void>::value>> - int append(const K& key, const V& value, bool is_tombstone = false) { - static_assert(std::is_same<W_, void>::value); - if (is_tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; + template <typename R_ = R> + int append(const R &rec) { + if (rec.is_tombstone() && m_tombstonecnt + 1 > m_tombstone_cap) return 0; int32_t pos = 0; if ((pos = try_advance_tail()) == -1) return 0; - m_data[pos].key = key; - m_data[pos].value = value; - m_data[pos].header = ((pos << 2) | (is_tombstone ? 1 : 0)); + m_data[pos] = rec; + m_data[pos].header |= (pos << 2); - if (is_tombstone) { + if (rec.is_tombstone()) { m_tombstonecnt.fetch_add(1); - if (m_tombstone_filter) m_tombstone_filter->insert(key); + if (m_tombstone_filter) m_tombstone_filter->insert(rec.key); } - m_weight.fetch_add(1); - return 1; - } - - template <typename W_=W, - typename = std::enable_if_t<!std::is_same<W_, void>::value>> - int append(const K& key, const V& value, W_ weight=1, bool is_tombstone = false) { - static_assert(!std::is_same<W_, void>::value); - if (is_tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; - - int32_t pos = 0; - if ((pos = try_advance_tail()) == -1) return 0; - - if (is_tombstone) { - weight = 0; - } - - m_data[pos].key = key; - m_data[pos].value = value; - m_data[pos].header = ((pos << 2) | (is_tombstone ? 1 : 0)); - m_data[pos].weight = weight; - - if (is_tombstone) { - m_tombstonecnt.fetch_add(1); - if (m_tombstone_filter) m_tombstone_filter->insert(key); - } - - double old_val, new_val; - do { - old_val = m_weight.load(); - new_val = old_val + weight; - } while (!m_weight.compare_exchange_strong(old_val, new_val)); - - - double old = m_max_weight.load(); - while (old < weight) { - m_max_weight.compare_exchange_strong(old, weight); - old = m_max_weight.load(); + if constexpr (WeightedRecordInterface<R_>) { + m_weight.fetch_add(rec.weight); + double old = m_max_weight.load(); + while (old < rec.weight) { + m_max_weight.compare_exchange_strong(old, rec.weight); + old = m_max_weight.load(); + } + } else { + m_weight.fetch_add(1); } return 1; } - bool truncate() { m_tombstonecnt.store(0); m_reccnt.store(0); @@ -119,10 +86,10 @@ public: return true; } - Record<K, V, W>* sorted_output() { + R* sorted_output() { TIMER_INIT(); TIMER_START(); - std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp<K,V,W>); + std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp<R>); TIMER_STOP(); #ifdef INSTRUMENT_MERGING @@ -147,11 +114,11 @@ public: return m_tombstonecnt.load(); } - bool delete_record(const K& key, const V& val) { + bool delete_record(const R& rec) { auto offset = 0; while (offset < m_reccnt.load()) { - if (m_data[offset].match(key, val, false)) { - m_data[offset].set_delete_status(); + if (m_data[offset] == rec) { + m_data[offset].set_delete(); return true; } offset++; @@ -160,23 +127,25 @@ public: return false; } - bool check_tombstone(const K& key, const V& value) { - if (m_tombstone_filter && !m_tombstone_filter->lookup(key)) return false; + bool check_tombstone(const R& rec) { + if (m_tombstone_filter && !m_tombstone_filter->lookup(rec.key)) return false; auto offset = 0; while (offset < m_reccnt.load()) { - if (m_data[offset].match(key, value, true)) return true; + if (m_data[offset] == rec && m_data[offset].is_tombstone()) { + return true; + } offset++;; } return false; } - const Record<K, V, W>* get_record_at(size_t idx) { + const R* get_record_at(size_t idx) { return m_data + idx; } size_t get_memory_utilization() { - return m_cap * sizeof(Record<K, V, W>); + return m_cap * sizeof(R); } size_t get_aux_memory_utilization() { @@ -185,17 +154,18 @@ public: // // NOTE: This operation samples from records strictly between the upper and // lower bounds, not including them - double get_sample_range(const K& lower, const K& upper, std::vector<Record<K, V, W> *> &records, - Alias **alias, size_t *cutoff) { + template <typename R_ = R> + double get_sample_range(const decltype(R_::key) lower, const decltype(R_::key)& upper, + std::vector<R *> &records, Alias **alias, size_t *cutoff) { std::vector<double> weights; *cutoff = std::atomic_load(&m_reccnt) - 1; records.clear(); double tot_weight = 0.0; for (size_t i = 0; i < (*cutoff) + 1; i++) { - Record<K, V, W> *rec = m_data + i; + R *rec = m_data + i; - if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->get_delete_status()) { + if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->is_deleted()) { weights.push_back(rec->weight); records.push_back(rec); tot_weight += rec->weight; @@ -212,7 +182,8 @@ public: } // rejection sampling - const Record<K, V, W> *get_sample(const K& lower, const K& upper, gsl_rng *rng) { + template <typename R_ = R> + const R *get_sample(const decltype(R_::key)& lower, const decltype(R_::key)& upper, gsl_rng *rng) { size_t reccnt = m_reccnt.load(); if (reccnt == 0) { return nullptr; @@ -230,7 +201,7 @@ public: if (test <= rec->weight && rec->key >= lower && rec->key <= upper && - !rec->is_tombstone() && !rec->get_delete_status()) { + !rec->is_tombstone() && !rec->is_deleted()) { return rec; } @@ -259,7 +230,7 @@ private: size_t m_tombstone_cap; //char* m_data; - Record<K, V, W>* m_data; + R* m_data; BloomFilter* m_tombstone_filter; alignas(64) std::atomic<size_t> m_tombstonecnt; diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h index 6d97f95..dd2fd85 100644 --- a/include/shard/MemISAM.h +++ b/include/shard/MemISAM.h @@ -23,10 +23,11 @@ namespace de { thread_local size_t mrun_cancelations = 0; -template <typename K, typename V> +template <RecordInterface R> class MemISAM { private: -typedef Record<K, V> Rec; +typedef decltype(R::key) K; +typedef decltype(R::value) V; constexpr static size_t inmem_isam_node_size = 256; constexpr static size_t inmem_isam_fanout = inmem_isam_node_size / (sizeof(K) + sizeof(char*)); @@ -36,24 +37,23 @@ struct InMemISAMNode { char* child[inmem_isam_fanout]; }; -constexpr static size_t inmem_isam_leaf_fanout = inmem_isam_node_size / sizeof(Rec); +constexpr static size_t inmem_isam_leaf_fanout = inmem_isam_node_size / sizeof(R); constexpr static size_t inmem_isam_node_keyskip = sizeof(K) * inmem_isam_fanout; static_assert(sizeof(InMemISAMNode) == inmem_isam_node_size, "node size does not match"); - public: MemISAM(std::string data_fname, size_t record_cnt, size_t tombstone_cnt, BloomFilter *bf, bool tagging) : m_reccnt(record_cnt), m_tombstone_cnt(tombstone_cnt), m_deleted_cnt(0), m_tagging(tagging) { // read the stored data file the file - size_t alloc_size = (record_cnt * sizeof(Rec)) + (CACHELINE_SIZE - (record_cnt * sizeof(Rec)) % CACHELINE_SIZE); + size_t alloc_size = (record_cnt * sizeof(R)) + (CACHELINE_SIZE - (record_cnt * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, alloc_size); FILE *file = fopen(data_fname.c_str(), "rb"); assert(file); - auto res = fread(m_data, sizeof(Rec), m_reccnt, file); + auto res = fread(m_data, sizeof(R), m_reccnt, file); assert (res == m_reccnt); fclose(file); @@ -71,34 +71,34 @@ public: } } - MemISAM(MutableBuffer<K,V>* buffer, BloomFilter* bf, bool tagging) + MemISAM(MutableBuffer<R>* buffer, BloomFilter* bf, bool tagging) :m_reccnt(0), m_tombstone_cnt(0), m_isam_nodes(nullptr), m_deleted_cnt(0), m_tagging(tagging) { - size_t alloc_size = (buffer->get_record_count() * sizeof(Rec)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Rec)) % CACHELINE_SIZE); + size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); TIMER_INIT(); size_t offset = 0; m_reccnt = 0; TIMER_START(); - Rec* base = buffer->sorted_output(); + R* base = buffer->sorted_output(); TIMER_STOP(); auto sort_time = TIMER_RESULT(); - Rec* stop = base + buffer->get_record_count(); + R* stop = base + buffer->get_record_count(); TIMER_START(); while (base < stop) { if (!m_tagging) { if (!base->is_tombstone() && (base + 1 < stop) - && base->match(base + 1) && (base + 1)->is_tombstone()) { + && *base == *(base + 1) && (base + 1)->is_tombstone()) { base += 2; mrun_cancelations++; continue; } - } else if (base->get_delete_status()) { + } else if (base->is_deleted()) { base += 1; continue; } @@ -123,15 +123,15 @@ public: TIMER_STOP(); auto level_time = TIMER_RESULT(); - fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time); + //fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time); } MemISAM(MemISAM** runs, size_t len, BloomFilter* bf, bool tagging) :m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_isam_nodes(nullptr), m_tagging(tagging) { - std::vector<Cursor<K,V>> cursors; + std::vector<Cursor<R>> cursors; cursors.reserve(len); - PriorityQueue<K,V> pq(len); + PriorityQueue<R> pq(len); size_t attemp_reccnt = 0; @@ -142,21 +142,21 @@ public: attemp_reccnt += runs[i]->get_record_count(); pq.push(cursors[i].ptr, i); } else { - cursors.emplace_back(Cursor<K,V>{nullptr, nullptr, 0, 0}); + cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0}); } } - size_t alloc_size = (attemp_reccnt * sizeof(Rec)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Rec)) % CACHELINE_SIZE); + size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); size_t offset = 0; while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<K,V>{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<R>{nullptr, 0}; if (!m_tagging && !now.data->is_tombstone() && next.data != nullptr && - now.data->match(next.data) && next.data->is_tombstone()) { + *now.data == *next.data && next.data->is_tombstone()) { pq.pop(); pq.pop(); auto& cursor1 = cursors[now.version]; @@ -165,7 +165,7 @@ public: if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto& cursor = cursors[now.version]; - if (!m_tagging || !cursor.ptr->get_delete_status()) { + if (!m_tagging || !cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; if (cursor.ptr->is_tombstone()) { ++m_tombstone_cnt; @@ -188,7 +188,7 @@ public: if (m_isam_nodes) free(m_isam_nodes); } - Rec* sorted_output() const { + R* sorted_output() const { return m_data; } @@ -208,7 +208,7 @@ public: while (idx < m_reccnt && m_data[idx].lt(key, val)) ++idx; - if (m_data[idx].match(key, val, false)) { + if (m_data[idx] == R {key, val}) { m_data[idx].set_delete_status(); m_deleted_cnt++; return true; @@ -217,7 +217,7 @@ public: return false; } - const Rec* get_record_at(size_t idx) const { + const R* get_record_at(size_t idx) const { return (idx < m_reccnt) ? m_data + idx : nullptr; } @@ -235,7 +235,7 @@ public: now = next ? next : reinterpret_cast<const InMemISAMNode*>(now->child[inmem_isam_fanout - 1]); } - const Rec* pos = reinterpret_cast<const Rec*>(now); + const R* pos = reinterpret_cast<const R*>(now); while (pos < m_data + m_reccnt && pos->key < key) pos++; return pos - m_data; @@ -255,7 +255,7 @@ public: now = next ? next : reinterpret_cast<const InMemISAMNode*>(now->child[inmem_isam_fanout - 1]); } - const Rec* pos = reinterpret_cast<const Rec*>(now); + const R* pos = reinterpret_cast<const R*>(now); while (pos < m_data + m_reccnt && pos->key <= key) pos++; return pos - m_data; @@ -267,20 +267,20 @@ public: return false; } - Rec* ptr = m_data + idx; + R* ptr = m_data + idx; while (ptr < m_data + m_reccnt && ptr->lt(key, val)) ptr++; - return ptr->match(key, val, true); + return *ptr == R {key, val} && ptr->is_tombstone(); } size_t get_memory_utilization() { - return m_reccnt * sizeof(Rec) + m_internal_node_cnt * inmem_isam_node_size; + return m_reccnt * sizeof(R) + m_internal_node_cnt * inmem_isam_node_size; } void persist_to_file(std::string data_fname) { FILE *file = fopen(data_fname.c_str(), "wb"); assert(file); - fwrite(m_data, sizeof(Rec), m_reccnt, file); + fwrite(m_data, sizeof(R), m_reccnt, file); fclose(file); } @@ -303,14 +303,14 @@ private: InMemISAMNode* current_node = m_isam_nodes; - const Rec* leaf_base = m_data; - const Rec* leaf_stop = m_data + m_reccnt; + const R* leaf_base = m_data; + const R* leaf_stop = m_data + m_reccnt; while (leaf_base < leaf_stop) { size_t fanout = 0; for (size_t i = 0; i < inmem_isam_fanout; ++i) { auto rec_ptr = leaf_base + inmem_isam_leaf_fanout * i; if (rec_ptr >= leaf_stop) break; - const Rec* sep_key = std::min(rec_ptr + inmem_isam_leaf_fanout - 1, leaf_stop - 1); + const R* sep_key = std::min(rec_ptr + inmem_isam_leaf_fanout - 1, leaf_stop - 1); current_node->keys[i] = sep_key->key; current_node->child[i] = (char*)rec_ptr; ++fanout; @@ -350,7 +350,7 @@ private: } // Members: sorted data, internal ISAM levels, reccnt; - Rec* m_data; + R* m_data; InMemISAMNode* m_isam_nodes; InMemISAMNode* m_root; size_t m_reccnt; diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h index 39337bf..41766b9 100644 --- a/include/shard/WIRS.h +++ b/include/shard/WIRS.h @@ -12,6 +12,7 @@ #include <cassert> #include <queue> #include <memory> +#include <concepts> #include "ds/PriorityQueue.h" #include "util/Cursor.h" @@ -24,19 +25,26 @@ namespace de { thread_local size_t wirs_cancelations = 0; -template <typename K, typename V, typename W> +template <WeightedRecordInterface R> class WIRS { private: + + typedef decltype(R::key) K; + typedef decltype(R::value) V; + typedef decltype(R::weight) W; + + template <WeightedRecordInterface R_ = R> struct wirs_node { - struct wirs_node *left, *right; + struct wirs_node<R_> *left, *right; K low, high; W weight; Alias* alias; }; + template <WeightedRecordInterface R_ = R> struct WIRSState { W tot_weight; - std::vector<wirs_node*> nodes; + std::vector<wirs_node<R_>*> nodes; Alias* top_level_alias; ~WIRSState() { @@ -45,13 +53,13 @@ private: }; public: - WIRS(MutableBuffer<K, V, W>* buffer, BloomFilter* bf, bool tagging) + WIRS(MutableBuffer<R>* buffer, BloomFilter* bf, bool tagging) : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0), m_tagging(tagging), m_root(nullptr) { - size_t alloc_size = (buffer->get_record_count() * sizeof(Record<K, V, W>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Record<K, V, W>)) % CACHELINE_SIZE); + size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Record<K, V, W>*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); size_t offset = 0; m_reccnt = 0; @@ -61,13 +69,13 @@ public: while (base < stop) { if (!m_tagging) { if (!(base->is_tombstone()) && (base + 1) < stop) { - if (base->match(base + 1) && (base + 1)->is_tombstone()) { + if (*base == *(base + 1) && (base + 1)->is_tombstone()) { base += 2; wirs_cancelations++; continue; } } - } else if (base->get_delete_status()) { + } else if (base->is_deleted()) { base += 1; continue; } @@ -92,10 +100,10 @@ public: WIRS(WIRS** shards, size_t len, BloomFilter* bf, bool tagging) : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0), m_tagging(tagging), m_root(nullptr) { - std::vector<Cursor<K,V,W>> cursors; + std::vector<Cursor<R>> cursors; cursors.reserve(len); - PriorityQueue<K, V, W> pq(len); + PriorityQueue<R> pq(len); size_t attemp_reccnt = 0; @@ -106,28 +114,28 @@ public: attemp_reccnt += shards[i]->get_record_count(); pq.push(cursors[i].ptr, i); } else { - cursors.emplace_back(Cursor<K,V,W>{nullptr, nullptr, 0, 0}); + cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0}); } } - size_t alloc_size = (attemp_reccnt * sizeof(Record<K, V, W>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Record<K, V, W>)) % CACHELINE_SIZE); + size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Record<K, V, W>*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<K, V, W>{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<R>{nullptr, 0}; if (!m_tagging && !now.data->is_tombstone() && next.data != nullptr && - now.data->match(next.data) && next.data->is_tombstone()) { + *now.data == *next.data && next.data->is_tombstone()) { pq.pop(); pq.pop(); auto& cursor1 = cursors[now.version]; auto& cursor2 = cursors[next.version]; - if (advance_cursor<K,V,W>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor<K,V,W>(cursor2)) pq.push(cursor2.ptr, next.version); + if (advance_cursor<R>(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor<R>(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto& cursor = cursors[now.version]; - if (!m_tagging || !cursor.ptr->get_delete_status()) { + if (!m_tagging || !cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; m_total_weight += cursor.ptr->weight; if (bf && cursor.ptr->is_tombstone()) { @@ -137,7 +145,7 @@ public: } pq.pop(); - if (advance_cursor<K,V,W>(cursor)) pq.push(cursor.ptr, now.version); + if (advance_cursor<R>(cursor)) pq.push(cursor.ptr, now.version); } } @@ -155,16 +163,16 @@ public: free_tree(m_root); } - bool delete_record(const K& key, const V& val) { - size_t idx = get_lower_bound(key); + bool delete_record(const R& rec) { + size_t idx = get_lower_bound(rec.key); if (idx >= m_reccnt) { return false; } - while (idx < m_reccnt && m_data[idx].lt(key, val)) ++idx; + while (idx < m_reccnt && m_data[idx] < rec) ++idx; - if (m_data[idx].match(key, val, false)) { - m_data[idx].set_delete_status(); + if (m_data[idx] == R {rec.key, rec.val} && !m_data[idx].is_tombstone()) { + m_data[idx].set_delete(); m_deleted_cnt++; return true; } @@ -172,7 +180,7 @@ public: return false; } - void free_tree(struct wirs_node* node) { + void free_tree(struct wirs_node<R>* node) { if (node) { delete node->alias; free_tree(node->left); @@ -181,7 +189,7 @@ public: } } - Record<K, V, W>* sorted_output() const { + R* sorted_output() const { return m_data; } @@ -193,19 +201,19 @@ public: return m_tombstone_cnt; } - const Record<K, V, W>* get_record_at(size_t idx) const { + const R* get_record_at(size_t idx) const { if (idx >= m_reccnt) return nullptr; return m_data + idx; } // low - high -> decompose to a set of nodes. // Build Alias across the decomposed nodes. - WIRSState* get_sample_shard_state(const K& lower_key, const K& upper_key) { - WIRSState* res = new WIRSState(); + WIRSState<R>* get_sample_shard_state(const K& lower_key, const K& upper_key) { + auto res = new WIRSState(); // Simulate a stack to unfold recursion. double tot_weight = 0.0; - struct wirs_node* st[64] = {0}; + struct wirs_node<R>* st[64] = {0}; st[0] = m_root; size_t top = 1; while(top > 0) { @@ -231,15 +239,15 @@ public: } static void delete_state(void *state) { - auto s = (WIRSState *) state; + WIRSState<R> *s = (WIRSState<R> *) state; delete s; } // returns the number of records sampled // NOTE: This operation returns records strictly between the lower and upper bounds, not // including them. - size_t get_samples(void* shard_state, std::vector<Record<K, V, W>> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { - WIRSState *state = (WIRSState *) shard_state; + size_t get_samples(void* shard_state, std::vector<R> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { + WIRSState<R> *state = (WIRSState<R> *) shard_state; if (sample_sz == 0) { return 0; } @@ -295,30 +303,30 @@ public: auto ptr = m_data + get_lower_bound(key); - while (ptr < m_data + m_reccnt && ptr->lt(key, val)) { + while (ptr < m_data + m_reccnt && *ptr < R {key, val}) { ptr ++; } - bool result = (m_tagging) ? ptr->get_delete_status() - : ptr->match(key, val, true); + bool result = (m_tagging) ? ptr->is_deleted() + : *ptr == R {key, val} && ptr->is_tombstone(); m_rejection_cnt += result; return result; } - bool check_tombstone(const K& key, const V& val) { + bool check_tombstone(const R& rec) { m_ts_check_cnt++; - size_t idx = get_lower_bound(key); + size_t idx = get_lower_bound(rec.key); if (idx >= m_reccnt) { return false; } - auto ptr = m_data + get_lower_bound(key); + auto ptr = m_data + get_lower_bound(rec.key); - while (ptr < m_data + m_reccnt && ptr->lt(key, val)) { + while (ptr < m_data + m_reccnt && *ptr < rec) { ptr ++; } - bool result = ptr->match(key, val, true); + bool result = *ptr == rec && ptr->is_tombstone(); m_rejection_cnt += result; return result; @@ -340,21 +348,21 @@ public: private: - bool covered_by(struct wirs_node* node, const K& lower_key, const K& upper_key) { + bool covered_by(struct wirs_node<R>* node, const K& lower_key, const K& upper_key) { auto low_index = node->low * m_group_size; auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1); return lower_key < m_data[low_index].key && m_data[high_index].key < upper_key; } - bool intersects(struct wirs_node* node, const K& lower_key, const K& upper_key) { + bool intersects(struct wirs_node<R>* node, const K& lower_key, const K& upper_key) { auto low_index = node->low * m_group_size; auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1); return lower_key < m_data[high_index].key && m_data[low_index].key < upper_key; } - struct wirs_node* construct_wirs_node(const std::vector<W>& weights, size_t low, size_t high) { + struct wirs_node<R>* construct_wirs_node(const std::vector<W>& weights, size_t low, size_t high) { if (low == high) { - return new wirs_node{nullptr, nullptr, low, high, weights[low], new Alias({1.0})}; + return new wirs_node<R>{nullptr, nullptr, low, high, weights[low], new Alias({1.0})}; } else if (low > high) return nullptr; std::vector<double> node_weights; @@ -370,9 +378,9 @@ private: size_t mid = (low + high) / 2; - return new wirs_node{construct_wirs_node(weights, low, mid), - construct_wirs_node(weights, mid + 1, high), - low, high, sum, new Alias(node_weights)}; + return new wirs_node<R>{construct_wirs_node(weights, low, mid), + construct_wirs_node(weights, mid + 1, high), + low, high, sum, new Alias(node_weights)}; } @@ -410,9 +418,9 @@ private: m_root = construct_wirs_node(weights, 0, n_groups-1); } - Record<K, V, W>* m_data; + R* m_data; std::vector<Alias *> m_alias; - wirs_node* m_root; + wirs_node<R>* m_root; bool m_tagging; W m_total_weight; size_t m_reccnt; diff --git a/include/util/Cursor.h b/include/util/Cursor.h index 2339800..2609ae5 100644 --- a/include/util/Cursor.h +++ b/include/util/Cursor.h @@ -14,10 +14,10 @@ #include "io/PagedFile.h" namespace de { -template<typename K, typename V, typename W=void> +template<typename R> struct Cursor { - Record<K,V,W> *ptr; - const Record<K,V,W> *end; + R *ptr; + R *end; size_t cur_rec_idx; size_t rec_cnt; @@ -36,8 +36,8 @@ struct Cursor { * be updated to be equal to end, and false will be returned. Iterators will * not be closed. */ -template<typename K, typename V, typename W> -inline static bool advance_cursor(Cursor<K,V,W> &cur, PagedFileIterator *iter = nullptr) { +template<typename R> +inline static bool advance_cursor(Cursor<R> &cur, PagedFileIterator *iter = nullptr) { cur.ptr++; cur.cur_rec_idx++; @@ -45,8 +45,8 @@ inline static bool advance_cursor(Cursor<K,V,W> &cur, PagedFileIterator *iter = if (cur.ptr >= cur.end) { if (iter && iter->next()) { - cur.ptr = (Record<K,V,W>*)iter->get_item(); - cur.end = cur.ptr + (PAGE_SIZE / sizeof(Record<K,V,W>)); + cur.ptr = (R*)iter->get_item(); + cur.end = cur.ptr + (PAGE_SIZE / sizeof(R)); return true; } @@ -62,14 +62,14 @@ inline static bool advance_cursor(Cursor<K,V,W> &cur, PagedFileIterator *iter = * This allows for "peaking" at the next largest element after the current * largest is processed. */ -template <typename K, typename V, typename W> -inline static Cursor<K,V,W> *get_next(std::vector<Cursor<K,V,W>> &cursors, Cursor<K,V,W> *current=nullptr) { - const Record<K,V,W> *min_rec = nullptr; - Cursor<K,V,W> *result = nullptr; +template <typename R> +inline static Cursor<R> *get_next(std::vector<Cursor<R>> &cursors, Cursor<R> *current=nullptr) { + const R *min_rec = nullptr; + Cursor<R> *result = nullptr; for (size_t i=0; i< cursors.size(); i++) { - if (cursors[i] == (Cursor<K,V,W>) {0} ) continue; + if (cursors[i] == (Cursor<R>) {0} ) continue; - const Record<K,V,W> *rec = (&cursors[i] == current) ? cursors[i].ptr + 1 : cursors[i].ptr; + const R *rec = (&cursors[i] == current) ? cursors[i].ptr + 1 : cursors[i].ptr; if (rec >= cursors[i].end) continue; if (min_rec == nullptr) { diff --git a/include/util/Record.h b/include/util/Record.h index 687e745..ce101f4 100644 --- a/include/util/Record.h +++ b/include/util/Record.h @@ -10,50 +10,110 @@ #pragma once #include <cstring> +#include <concepts> #include "util/base.h" namespace de { -template <typename K, typename V, typename W=void> +template<typename R> +concept RecordInterface = requires(R r, R s) { + r.key; + r.value; + r.header; + + {r.is_tombstone()} -> std::convertible_to<bool>; + {r.is_deleted()} -> std::convertible_to<bool>; + r.set_delete(); + r.set_tombstone(std::declval<bool>); + { r < s } ->std::convertible_to<bool>; + { r == s } ->std::convertible_to<bool>; + { r.header < s.header } -> std::convertible_to<bool>; +}; + +template <typename R> +concept WeightedRecordInterface = RecordInterface<R> && requires(R r) { + {r.weight} -> std::convertible_to<double>; +}; + +template <typename K, typename V> struct Record { K key; V value; - typename std::conditional<!std::is_same<W, void>::value, W, std::false_type>::type weight; - uint32_t header; + uint32_t header = 0; + + inline void set_delete() { + header |= 2; + } + + inline bool is_deleted() const { + return header & 2; + } + + inline void set_tombstone(bool val=true) { + if (val) { + header |= val; + } else { + header &= 0; + } + } + + inline bool is_tombstone() const { + return header & 1; + } + + inline bool operator<(const Record& other) const { + return key < other.key || (key == other.key && value < other.value); + } - inline bool match(K k, V v, bool is_tombstone) const { - return (key == k) && (value == v) && ((header & 1) == is_tombstone); + inline bool operator==(const Record& other) const { + return key == other.key && value == other.value; } +}; + +template <typename K, typename V, typename W> +struct WeightedRecord { + K key; + V value; + W weight = 1; + uint32_t header = 0; - inline void set_delete_status() { + inline void set_delete() { header |= 2; } - inline bool get_delete_status() const { + inline bool is_deleted() const { return header & 2; } + inline void set_tombstone(bool val=true) { + if (val) { + header |= val; + } else { + header &= 0; + } + } + inline bool is_tombstone() const { return header & 1; } - inline int match(const Record* other) const { + inline int match(const WeightedRecord* other) const { return key == other->key && value == other->value; } - inline bool operator<(const Record& other) const { + inline bool operator<(const WeightedRecord& other) const { return key < other.key || (key == other.key && value < other.value); } - inline bool lt(const K& k, const V& v) const { - return key < k || (key == k && value < v); + inline bool operator==(const WeightedRecord& other) const { + return key == other.key && value == other.value; } }; -template <typename K, typename V, typename W=void> -static bool memtable_record_cmp(const Record<K, V, W>& a, const Record<K, V, W>& b) { +template <RecordInterface R> +static bool memtable_record_cmp(const R& a, const R& b) { return (a.key < b.key) || (a.key == b.key && a.value < b.value) || (a.key == b.key && a.value == b.value && a.header < b.header); } diff --git a/tests/dynamic_extension_tests.cpp b/tests/dynamic_extension_tests.cpp index ce2abdb..3638b76 100644 --- a/tests/dynamic_extension_tests.cpp +++ b/tests/dynamic_extension_tests.cpp @@ -19,11 +19,9 @@ #include <check.h> using namespace de; -typedef DynamicExtension<uint64_t, uint32_t, uint64_t> DE_WIRS; - START_TEST(t_create) { - auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng); ck_assert_ptr_nonnull(ext_wirs); @@ -37,12 +35,13 @@ END_TEST START_TEST(t_append) { - auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng); uint64_t key = 0; uint32_t val = 0; for (size_t i=0; i<100; i++) { - ck_assert_int_eq(ext_wirs->append(key, val, 1, false, g_rng), 1); + WRec r = {key, val, 1}; + ck_assert_int_eq(ext_wirs->append(r, g_rng), 1); key++; val++; } @@ -57,12 +56,13 @@ END_TEST START_TEST(t_append_with_mem_merges) { - auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng); uint64_t key = 0; uint32_t val = 0; for (size_t i=0; i<300; i++) { - ck_assert_int_eq(ext_wirs->append(key, val, 1, false, g_rng), 1); + WRec r = {key, val, 1}; + ck_assert_int_eq(ext_wirs->append(r, g_rng), 1); key++; val++; } @@ -77,12 +77,13 @@ END_TEST START_TEST(t_range_sample_memtable) { - auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng); uint64_t key = 0; uint32_t val = 0; for (size_t i=0; i<100; i++) { - ck_assert_int_eq(ext_wirs->append(key, val, 1, false, g_rng), 1); + WRec r = {key, val, 1}; + ck_assert_int_eq(ext_wirs->append(r, g_rng), 1); key++; val++; } @@ -92,7 +93,7 @@ START_TEST(t_range_sample_memtable) char *buf = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE); char *util_buf = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE); - WeightedRec sample_set[100]; + WRec sample_set[100]; ext_wirs->range_sample(sample_set, lower_bound, upper_bound, 100, g_rng); @@ -111,12 +112,13 @@ END_TEST START_TEST(t_range_sample_memlevels) { - auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng); uint64_t key = 0; uint32_t val = 0; for (size_t i=0; i<300; i++) { - ck_assert_int_eq(ext_wirs->append(key, val, 1, false, g_rng), 1); + WRec r = {key, val, 1}; + ck_assert_int_eq(ext_wirs->append(r, g_rng), 1); key++; val++; } @@ -127,7 +129,7 @@ START_TEST(t_range_sample_memlevels) char *buf = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE); char *util_buf = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE); - WeightedRec sample_set[100]; + WRec sample_set[100]; ext_wirs->range_sample(sample_set, lower_bound, upper_bound, 100, g_rng); for(size_t i=0; i<100; i++) { @@ -144,7 +146,7 @@ END_TEST START_TEST(t_range_sample_weighted) { - auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng); size_t n = 10000; std::vector<uint64_t> keys; @@ -171,22 +173,23 @@ START_TEST(t_range_sample_weighted) std::shuffle(keys.begin(), keys.end(), gen); for (size_t i=0; i<keys.size(); i++) { - double weight; + uint64_t weight; if (keys[i] == 1) { - weight = 2.0; + weight = 2; } else if (keys[i] == 2) { - weight = 4.0; + weight = 4; } else { - weight = 8.0; + weight = 8; } - ext_wirs->append(keys[i], i, weight, false, g_rng); + WRec r = {keys[i], (uint32_t) i, weight, 0}; + ext_wirs->append(r, g_rng); } size_t k = 1000; uint64_t lower_key = 0; uint64_t upper_key = 5; - WeightedRec* buffer = new WeightedRec[k](); + WRec* buffer = new WRec[k](); char *buffer1 = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE); char *buffer2 = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE); @@ -214,7 +217,7 @@ END_TEST START_TEST(t_tombstone_merging_01) { size_t reccnt = 100000; - auto ext_wirs = new DE_WIRS(100, 100, 2, .01, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, .01, 1, g_rng); std::set<std::pair<uint64_t, uint32_t>> records; std::set<std::pair<uint64_t, uint32_t>> to_delete; @@ -232,14 +235,17 @@ START_TEST(t_tombstone_merging_01) size_t deletes = 0; size_t cnt=0; for (auto rec : records) { - ck_assert_int_eq(ext_wirs->append(rec.first, rec.second, 1, false, g_rng), 1); + WRec r = {rec.first, rec.second, 1, 0}; + ck_assert_int_eq(ext_wirs->append(r, g_rng), 1); if (gsl_rng_uniform(g_rng) < 0.05 && !to_delete.empty()) { std::vector<std::pair<uint64_t, uint32_t>> del_vec; std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); for (size_t i=0; i<del_vec.size(); i++) { - ext_wirs->append(del_vec[i].first, del_vec[i].second, 1, true, g_rng); + WRec dr = {del_vec[i].first, del_vec[i].second, 1}; + dr.set_tombstone(); + ext_wirs->append(dr, g_rng); deletes++; to_delete.erase(del_vec[i]); deleted.insert(del_vec[i]); @@ -259,12 +265,12 @@ START_TEST(t_tombstone_merging_01) } END_TEST -DE_WIRS *create_test_tree(size_t reccnt, size_t memlevel_cnt) { - auto ext_wirs = new DE_WIRS(1000, 1000, 2, 1, 1, g_rng); +DynamicExtension<WRec> *create_test_tree(size_t reccnt, size_t memlevel_cnt) { + auto ext_wirs = new DynamicExtension<WRec>(1000, 1000, 2, 1, 1, g_rng); - std::set<std::pair<uint64_t, uint32_t>> records; - std::set<std::pair<uint64_t, uint32_t>> to_delete; - std::set<std::pair<uint64_t, uint32_t>> deleted; + std::set<WRec> records; + std::set<WRec> to_delete; + std::set<WRec> deleted; while (records.size() < reccnt) { uint64_t key = rand(); @@ -277,14 +283,15 @@ DE_WIRS *create_test_tree(size_t reccnt, size_t memlevel_cnt) { size_t deletes = 0; for (auto rec : records) { - ck_assert_int_eq(ext_wirs->append(rec.first, rec.second, 1, 0, g_rng), 1); + ck_assert_int_eq(ext_wirs->append(rec, g_rng), 1); if (gsl_rng_uniform(g_rng) < 0.05 && !to_delete.empty()) { - std::vector<std::pair<uint64_t, uint32_t>> del_vec; + std::vector<WRec> del_vec; std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); for (size_t i=0; i<del_vec.size(); i++) { - ext_wirs->append(del_vec[i].first, del_vec[i].second, 1, true, g_rng); + del_vec[i].set_tombstone(); + ext_wirs->append(del_vec[i], g_rng); deletes++; to_delete.erase(del_vec[i]); deleted.insert(del_vec[i]); @@ -302,7 +309,7 @@ DE_WIRS *create_test_tree(size_t reccnt, size_t memlevel_cnt) { START_TEST(t_sorted_array) { size_t reccnt = 100000; - auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng); + auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng); std::set<std::pair<uint64_t, uint32_t>> records; std::set<std::pair<uint64_t, uint32_t>> to_delete; @@ -319,14 +326,18 @@ START_TEST(t_sorted_array) size_t deletes = 0; for (auto rec : records) { - ck_assert_int_eq(ext_wirs->append(rec.first, rec.second, 1, 0, g_rng), 1); + WRec r = {rec.first, rec.second, 1}; + ck_assert_int_eq(ext_wirs->append(r, g_rng), 1); if (gsl_rng_uniform(g_rng) < 0.05 && !to_delete.empty()) { std::vector<std::pair<uint64_t, uint32_t>> del_vec; std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); for (size_t i=0; i<del_vec.size(); i++) { - ext_wirs->append(del_vec[i].first, del_vec[i].second, 1, true, g_rng); + WRec dr = {del_vec[i].first, del_vec[i].second, 1}; + dr.set_tombstone(); + + ext_wirs->append(dr , g_rng); deletes++; to_delete.erase(del_vec[i]); deleted.insert(del_vec[i]); diff --git a/tests/internal_level_tests.cpp b/tests/internal_level_tests.cpp index 7e542e6..7842b01 100644 --- a/tests/internal_level_tests.cpp +++ b/tests/internal_level_tests.cpp @@ -20,19 +20,19 @@ using namespace de; START_TEST(t_memlevel_merge) { - auto tbl1 = create_test_mbuffer<uint64_t, uint32_t, uint64_t>(100); - auto tbl2 = create_test_mbuffer<uint64_t, uint32_t, uint64_t>(100); + auto tbl1 = create_test_mbuffer<WRec>(100); + auto tbl2 = create_test_mbuffer<WRec>(100); - auto base_level = new WeightedLevel(1, 1, false); + auto base_level = new InternalLevel<WRec>(1, 1, false); base_level->append_mem_table(tbl1, g_rng); ck_assert_int_eq(base_level->get_record_cnt(), 100); - auto merging_level = new WeightedLevel(0, 1, false); + auto merging_level = new InternalLevel<WRec>(0, 1, false); merging_level->append_mem_table(tbl2, g_rng); ck_assert_int_eq(merging_level->get_record_cnt(), 100); auto old_level = base_level; - base_level = WeightedLevel::merge_levels(old_level, merging_level, false, g_rng); + base_level = InternalLevel<WRec>::merge_levels(old_level, merging_level, false, g_rng); delete old_level; delete merging_level; @@ -44,11 +44,11 @@ START_TEST(t_memlevel_merge) } -WeightedLevel *create_test_memlevel(size_t reccnt) { - auto tbl1 = create_test_mbuffer<uint64_t, uint32_t, uint64_t>(reccnt/2); - auto tbl2 = create_test_mbuffer<uint64_t, uint32_t, uint64_t>(reccnt/2); +InternalLevel<WRec> *create_test_memlevel(size_t reccnt) { + auto tbl1 = create_test_mbuffer<WRec>(reccnt/2); + auto tbl2 = create_test_mbuffer<WRec>(reccnt/2); - auto base_level = new WeightedLevel(1, 2, false); + auto base_level = new InternalLevel<WRec>(1, 2, false); base_level->append_mem_table(tbl1, g_rng); base_level->append_mem_table(tbl2, g_rng); diff --git a/tests/memisam_tests.cpp b/tests/memisam_tests.cpp index f3b80b3..1609edf 100644 --- a/tests/memisam_tests.cpp +++ b/tests/memisam_tests.cpp @@ -8,24 +8,25 @@ using namespace de; -typedef MemISAM<uint64_t, uint32_t> M_ISAM; +typedef MemISAM<Rec> M_ISAM; START_TEST(t_memtable_init) { - auto buffer = new UnweightedMBuffer(1024, true, 512, g_rng); + auto buffer = new MutableBuffer<Rec>(1024, true, 512, g_rng); for (uint64_t i = 512; i > 0; i--) { - uint32_t v = i; - buffer->append(i, v); + buffer->append(Rec {i, (uint32_t)i}); } + Rec r; + r.set_tombstone(); for (uint64_t i = 1; i <= 256; ++i) { - uint32_t v = i; - buffer->append(i, v, true); + r.key = i; + r.value = i; + buffer->append(r); } for (uint64_t i = 257; i <= 512; ++i) { - uint32_t v = i + 1; - buffer->append(i, v); + buffer->append({i, (uint32_t) i+1}); } BloomFilter* bf = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, g_rng); @@ -40,9 +41,9 @@ START_TEST(t_memtable_init) START_TEST(t_inmemrun_init) { size_t n = 512; - auto memtable1 = create_test_mbuffer<uint64_t, uint32_t>(n); - auto memtable2 = create_test_mbuffer<uint64_t, uint32_t>(n); - auto memtable3 = create_test_mbuffer<uint64_t, uint32_t>(n); + auto memtable1 = create_test_mbuffer<Rec>(n); + auto memtable2 = create_test_mbuffer<Rec>(n); + auto memtable3 = create_test_mbuffer<Rec>(n); BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); BloomFilter* bf2 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); @@ -70,11 +71,11 @@ START_TEST(t_inmemrun_init) auto cur_rec = run4->get_record_at(i); - if (run1_idx < n && cur_rec->match(rec1)) { + if (run1_idx < n && cur_rec == rec1) { ++run1_idx; - } else if (run2_idx < n && cur_rec->match(rec2)) { + } else if (run2_idx < n && cur_rec == rec2) { ++run2_idx; - } else if (run3_idx < n && cur_rec->match(rec3)) { + } else if (run3_idx < n && cur_rec == rec3) { ++run3_idx; } else { assert(false); @@ -98,7 +99,7 @@ START_TEST(t_inmemrun_init) START_TEST(t_get_lower_bound_index) { size_t n = 10000; - auto memtable = create_double_seq_mbuffer<uint64_t, uint32_t>(n); + auto memtable = create_double_seq_mbuffer<Rec>(n); ck_assert_ptr_nonnull(memtable); BloomFilter* bf = new BloomFilter(100, BF_HASH_FUNCS, g_rng); @@ -109,7 +110,7 @@ START_TEST(t_get_lower_bound_index) auto tbl_records = memtable->sorted_output(); for (size_t i=0; i<n; i++) { - const UnweightedRec *tbl_rec = memtable->get_record_at(i); + const auto *tbl_rec = memtable->get_record_at(i); auto pos = run->get_lower_bound(tbl_rec->key); ck_assert_int_eq(run->get_record_at(pos)->key, tbl_rec->key); ck_assert_int_le(pos, i); @@ -123,7 +124,7 @@ START_TEST(t_get_lower_bound_index) START_TEST(t_get_upper_bound_index) { size_t n = 10000; - auto memtable = create_double_seq_mbuffer<uint64_t, uint32_t>(n); + auto memtable = create_double_seq_mbuffer<Rec>(n); ck_assert_ptr_nonnull(memtable); BloomFilter* bf = new BloomFilter(100, BF_HASH_FUNCS, g_rng); @@ -134,7 +135,7 @@ START_TEST(t_get_upper_bound_index) auto tbl_records = memtable->sorted_output(); for (size_t i=0; i<n; i++) { - const UnweightedRec *tbl_rec = memtable->get_record_at(i); + const auto *tbl_rec = memtable->get_record_at(i); auto pos = run->get_upper_bound(tbl_rec->key); ck_assert(pos == run->get_record_count() || run->get_record_at(pos)->key > tbl_rec->key); @@ -150,8 +151,8 @@ START_TEST(t_get_upper_bound_index) START_TEST(t_full_cancelation) { size_t n = 100; - auto mtable = create_double_seq_mbuffer<uint64_t, uint32_t>(n, false); - auto mtable_ts = create_double_seq_mbuffer<uint64_t, uint32_t>(n, true); + auto mtable = create_double_seq_mbuffer<Rec>(n, false); + auto mtable_ts = create_double_seq_mbuffer<Rec>(n, true); BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); BloomFilter* bf2 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); BloomFilter* bf3 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp index 2112036..75cbeec 100644 --- a/tests/mutable_buffer_tests.cpp +++ b/tests/mutable_buffer_tests.cpp @@ -20,12 +20,14 @@ #include <check.h> +#define DE_MT_TEST 0 + using namespace de; START_TEST(t_create) { auto rng = gsl_rng_alloc(gsl_rng_mt19937); - auto buffer = new WeightedMBuffer(100, true, 50, rng); + auto buffer = new MutableBuffer<Rec>(100, true, 50, rng); ck_assert_ptr_nonnull(buffer); ck_assert_int_eq(buffer->get_capacity(), 100); @@ -44,30 +46,32 @@ END_TEST START_TEST(t_insert) { auto rng = gsl_rng_alloc(gsl_rng_mt19937); - auto buffer = new WeightedMBuffer(100, true, 50, rng); + auto buffer = new MutableBuffer<WRec>(100, true, 50, rng); uint64_t key = 0; uint32_t val = 5; + WRec rec = {0, 5, 1}; + for (size_t i=0; i<99; i++) { - ck_assert_int_eq(buffer->append(key, val, 1, false), 1); - ck_assert_int_eq(buffer->check_tombstone(key, val), 0); + ck_assert_int_eq(buffer->append(rec), 1); + ck_assert_int_eq(buffer->check_tombstone(rec), 0); - key++; - val++; + rec.key++; + rec.value++; ck_assert_int_eq(buffer->get_record_count(), i+1); ck_assert_int_eq(buffer->get_tombstone_count(), 0); ck_assert_int_eq(buffer->is_full(), 0); } - ck_assert_int_eq(buffer->append(key, val, 1.0, false), 1); + ck_assert_int_eq(buffer->append(rec), 1); - key++; - val++; + rec.key++; + rec.value++; ck_assert_int_eq(buffer->is_full(), 1); - ck_assert_int_eq(buffer->append(key, val, 1.0, false), 0); + ck_assert_int_eq(buffer->append(rec), 0); delete buffer; gsl_rng_free(rng); @@ -79,12 +83,12 @@ END_TEST START_TEST(t_insert_tombstones) { auto rng = gsl_rng_alloc(gsl_rng_mt19937); - auto buffer = new WeightedMBuffer(100, true, 50, rng); + auto buffer = new MutableBuffer<Rec>(100, true, 50, rng); - uint64_t key = 0; - uint32_t val = 5; size_t ts_cnt = 0; + Rec rec = {0, 5}; + for (size_t i=0; i<99; i++) { bool ts = false; if (i % 2 == 0) { @@ -92,11 +96,13 @@ START_TEST(t_insert_tombstones) ts=true; } - ck_assert_int_eq(buffer->append(key, val, 1.0, ts), 1); - ck_assert_int_eq(buffer->check_tombstone(key, val), ts); + rec.set_tombstone(ts); - key++; - val++; + ck_assert_int_eq(buffer->append(rec), 1); + ck_assert_int_eq(buffer->check_tombstone(rec), ts); + + rec.key++; + rec.value++; ck_assert_int_eq(buffer->get_record_count(), i+1); ck_assert_int_eq(buffer->get_tombstone_count(), ts_cnt); @@ -104,16 +110,18 @@ START_TEST(t_insert_tombstones) } // inserting one more tombstone should not be possible - ck_assert_int_eq(buffer->append(key, val, 1.0, true), 0); + rec.set_tombstone(); + ck_assert_int_eq(buffer->append(rec), 0); - ck_assert_int_eq(buffer->append(key, val, 1.0, false), 1); + rec.set_tombstone(false); + ck_assert_int_eq(buffer->append(rec), 1); - key++; - val++; + rec.key++; + rec.value++; ck_assert_int_eq(buffer->is_full(), 1); - ck_assert_int_eq(buffer->append(key, val, 1.0, false), 0); + ck_assert_int_eq(buffer->append(rec), 0); delete buffer; gsl_rng_free(rng); @@ -124,11 +132,10 @@ END_TEST START_TEST(t_truncate) { auto rng = gsl_rng_alloc(gsl_rng_mt19937); - auto buffer = new WeightedMBuffer(100, true, 100, rng); + auto buffer = new MutableBuffer<Rec>(100, true, 100, rng); - uint64_t key = 0; - uint32_t val = 5; size_t ts_cnt = 0; + Rec rec = {0, 5}; for (size_t i=0; i<100; i++) { bool ts = false; @@ -137,25 +144,28 @@ START_TEST(t_truncate) ts=true; } - ck_assert_int_eq(buffer->append(key, val, 1.0, ts), 1); - ck_assert_int_eq(buffer->check_tombstone(key, val), ts); + rec.set_tombstone(ts); - key++; - val++; + ck_assert_int_eq(buffer->append(rec), 1); + ck_assert_int_eq(buffer->check_tombstone(rec), ts); + + rec.key++; + rec.value++; ck_assert_int_eq(buffer->get_record_count(), i+1); ck_assert_int_eq(buffer->get_tombstone_count(), ts_cnt); } ck_assert_int_eq(buffer->is_full(), 1); - ck_assert_int_eq(buffer->append(key, val, 1.0, false), 0); + rec.set_tombstone(false); + ck_assert_int_eq(buffer->append(rec), 0); ck_assert_int_eq(buffer->truncate(), 1); ck_assert_int_eq(buffer->is_full(), 0); ck_assert_int_eq(buffer->get_record_count(), 0); ck_assert_int_eq(buffer->get_tombstone_count(), 0); - ck_assert_int_eq(buffer->append(key, val, 1.0, false), 1); + ck_assert_int_eq(buffer->append(rec), 1); delete buffer; gsl_rng_free(rng); @@ -169,7 +179,7 @@ START_TEST(t_sorted_output) size_t cnt = 100; auto rng = gsl_rng_alloc(gsl_rng_mt19937); - auto buffer = new WeightedMBuffer(cnt, true, cnt/2, rng); + auto buffer = new MutableBuffer<Rec>(cnt, true, cnt/2, rng); std::vector<uint64_t> keys(cnt); @@ -184,14 +194,19 @@ START_TEST(t_sorted_output) uint32_t val = 12345; for (size_t i=0; i<cnt-2; i++) { - buffer->append(keys[i], val, 1.0, false); + buffer->append(Rec {keys[i], val}); } - buffer->append(keys[cnt-2], val, 1.0, true); - buffer->append(keys[cnt-1], val, 1.0, true); + Rec r1 = {keys[cnt-2], val}; + r1.set_tombstone(); + buffer->append(r1); + + Rec r2 = {keys[cnt-1], val}; + r2.set_tombstone(); + buffer->append(r2); - WeightedRec *sorted_records = buffer->sorted_output(); + auto *sorted_records = buffer->sorted_output(); std::sort(keys.begin(), keys.end()); for (size_t i=0; i<cnt; i++) { @@ -204,23 +219,24 @@ START_TEST(t_sorted_output) END_TEST -void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t start, size_t stop, WeightedMBuffer *buffer) +void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t start, size_t stop, MutableBuffer<Rec> *buffer) { for (size_t i=start; i<stop; i++) { - buffer->append((*values)[i].first, (*values)[i].second, 1.0); + buffer->append({(*values)[i].first, (*values)[i].second}); } } +#if DE_MT_TEST START_TEST(t_multithreaded_insert) { size_t cnt = 10000; auto rng = gsl_rng_alloc(gsl_rng_mt19937); - auto buffer = new WeightedMBuffer(cnt, true, cnt/2, rng); + auto buffer = new MutableBuffer<Rec>(cnt, true, cnt/2, rng); - std::vector<std::pair<uint64_t, uint32_t>> records(cnt); + std::vector<Rec> records(cnt); for (size_t i=0; i<cnt; i++) { - records[i] = {rand(), rand()}; + records[i] = Rec {(uint64_t) rand(), (uint32_t) rand()}; } // perform a t_multithreaded insertion @@ -245,15 +261,16 @@ START_TEST(t_multithreaded_insert) ck_assert_int_eq(buffer->get_record_count(), cnt); std::sort(records.begin(), records.end()); - WeightedRec *sorted_records = buffer->sorted_output(); + auto *sorted_records = buffer->sorted_output(); for (size_t i=0; i<cnt; i++) { - ck_assert_int_eq(sorted_records[i].key, records[i].first); + ck_assert_int_eq(sorted_records[i].key, records[i].key); } delete buffer; gsl_rng_free(rng); } END_TEST +#endif Suite *unit_testing() @@ -268,7 +285,9 @@ Suite *unit_testing() TCase *append = tcase_create("de::MutableBuffer::append Testing"); tcase_add_test(append, t_insert); tcase_add_test(append, t_insert_tombstones); - tcase_add_test(append, t_multithreaded_insert); + #if DE_MT_TEST + tcase_add_test(append, t_multithreaded_insert); + #endif suite_add_tcase(unit, append); diff --git a/tests/testing.h b/tests/testing.h index 062e930..eb5c095 100644 --- a/tests/testing.h +++ b/tests/testing.h @@ -16,18 +16,14 @@ #include <unistd.h> #include <fcntl.h> +#include "util/Record.h" #include "util/types.h" #include "util/base.h" #include "framework/MutableBuffer.h" -#include "framework/InternalLevel.h" +//#include "framework/InternalLevel.h" -typedef de::Record<uint64_t, uint32_t, uint64_t> WeightedRec; -typedef de::MutableBuffer<uint64_t, uint32_t, uint64_t> WeightedMBuffer; -typedef de::InternalLevel<uint64_t, uint32_t, uint64_t> WeightedLevel; - -typedef de::Record<uint64_t, uint32_t> UnweightedRec; -typedef de::MutableBuffer<uint64_t, uint32_t> UnweightedMBuffer; -typedef de::InternalLevel<uint64_t, uint32_t> UnweightedLevel; +typedef de::WeightedRecord<uint64_t, uint32_t, uint64_t> WRec; +typedef de::Record<uint64_t, uint32_t> Rec; static gsl_rng *g_rng = gsl_rng_alloc(gsl_rng_mt19937); @@ -72,90 +68,101 @@ static bool roughly_equal(int n1, int n2, size_t mag, double epsilon) { return ((double) std::abs(n1 - n2) / (double) mag) < epsilon; } -template <typename K, typename V, typename W=void> -static de::MutableBuffer<K,V,W> *create_test_mbuffer(size_t cnt) +template <de::RecordInterface R> +static de::MutableBuffer<R> *create_test_mbuffer(size_t cnt) { - auto buffer = new de::MutableBuffer<K,V,W>(cnt, true, cnt, g_rng); + auto buffer = new de::MutableBuffer<R>(cnt, true, cnt, g_rng); + R rec; for (size_t i = 0; i < cnt; i++) { - uint64_t key = rand(); - uint32_t val = rand(); + rec.key = rand(); + rec.value = rand(); + + if constexpr (de::WeightedRecordInterface<R>) { + rec.weight = 1; + } - buffer->append(key, val); + buffer->append(rec); } return buffer; } -template <typename K, typename V, typename W=void> -static de::MutableBuffer<K,V,W> *create_test_mbuffer_tombstones(size_t cnt, size_t ts_cnt) +template <de::RecordInterface R> +static de::MutableBuffer<R> *create_test_mbuffer_tombstones(size_t cnt, size_t ts_cnt) { - auto buffer = new de::MutableBuffer<K,V,W>(cnt, true, ts_cnt, g_rng); + auto buffer = new de::MutableBuffer<R>(cnt, true, ts_cnt, g_rng); std::vector<std::pair<uint64_t, uint32_t>> tombstones; + R rec; for (size_t i = 0; i < cnt; i++) { - uint64_t key = rand(); - uint32_t val = rand(); + rec.key = rand(); + rec.value = rand(); + + if constexpr (de::WeightedRecordInterface<R>) { + rec.weight = 1; + } if (i < ts_cnt) { - tombstones.push_back({key, val}); + tombstones.push_back({rec.key, rec.value}); } - buffer->append(key, val); + buffer->append(rec); } + rec.set_tombstone(); for (size_t i=0; i<ts_cnt; i++) { - buffer->append(tombstones[i].first, tombstones[i].second, true); + buffer->append(rec); } return buffer; } -template <typename K, typename V, typename W=void> -static de::MutableBuffer<K,V,W> *create_weighted_mbuffer(size_t cnt) +template <de::WeightedRecordInterface R> +static de::MutableBuffer<R> *create_weighted_mbuffer(size_t cnt) { - static_assert(!std::is_same<W, void>::value); - auto buffer = new de::MutableBuffer<K,V,W>(cnt, true, cnt, g_rng); + auto buffer = new de::MutableBuffer<R>(cnt, true, cnt, g_rng); // Put in half of the count with weight one. - uint64_t key = 1; - for (size_t i=0; i< cnt / 2; i++) { - buffer->append(key, i, 2); + for (uint32_t i=0; i< cnt / 2; i++) { + buffer->append(R {1, i, 2}); } - // put in a quarter of the count with weight two. - key = 2; - for (size_t i=0; i< cnt / 4; i++) { - buffer->append(key, i, 4); + // put in a quarter of the count with weight four. + for (uint32_t i=0; i< cnt / 4; i++) { + buffer->append(R {2, i, 4}); } - // the remaining quarter with weight four. - key = 3; - for (size_t i=0; i< cnt / 4; i++) { - buffer->append(key, i, 8); + // the remaining quarter with weight eight. + for (uint32_t i=0; i< cnt / 4; i++) { + buffer->append(R {3, i, 8}); } return buffer; } -template <typename K, typename V, typename W=void> -static de::MutableBuffer<K,V,W> *create_double_seq_mbuffer(size_t cnt, bool ts=false) +template <de::RecordInterface R> +static de::MutableBuffer<R> *create_double_seq_mbuffer(size_t cnt, bool ts=false) { - auto buffer = new de::MutableBuffer<K,V,W>(cnt, true, cnt, g_rng); + auto buffer = new de::MutableBuffer<R>(cnt, true, cnt, g_rng); for (size_t i = 0; i < cnt / 2; i++) { - uint64_t key = i; - uint32_t val = i; + R rec; + rec.key = i; + rec.value = i; + if (ts) rec.set_tombstone(); - buffer->append(key, val, ts); + buffer->append(rec); } for (size_t i = 0; i < cnt / 2; i++) { - uint64_t key = i; - uint32_t val = i + 1; + R rec; + rec.key = i; + rec.value = i + 1; + if (ts) rec.set_tombstone(); - buffer->append(key, val, ts); + buffer->append(rec); } return buffer; diff --git a/tests/wirs_tests.cpp b/tests/wirs_tests.cpp index ed83d40..673bdca 100644 --- a/tests/wirs_tests.cpp +++ b/tests/wirs_tests.cpp @@ -19,24 +19,24 @@ using namespace de; -typedef WIRS<uint64_t, uint32_t, uint64_t> Shard; +typedef WIRS<WRec> Shard; START_TEST(t_mbuffer_init) { - auto mem_table = new WeightedMBuffer(1024, true, 1024, g_rng); + auto mem_table = new MutableBuffer<WRec>(1024, true, 1024, g_rng); for (uint64_t i = 512; i > 0; i--) { uint32_t v = i; - mem_table->append(i, v); + mem_table->append({i,v, 1}); } for (uint64_t i = 1; i <= 256; ++i) { uint32_t v = i; - mem_table->append(i, v, 1.0, true); + mem_table->append({i, v, 1, 1}); } for (uint64_t i = 257; i <= 512; ++i) { uint32_t v = i + 1; - mem_table->append(i, v); + mem_table->append({i, v, 1}); } BloomFilter* bf = new BloomFilter(BF_FPR, mem_table->get_tombstone_count(), BF_HASH_FUNCS, g_rng); @@ -51,9 +51,9 @@ START_TEST(t_mbuffer_init) START_TEST(t_wirs_init) { size_t n = 512; - auto mbuffer1 = create_test_mbuffer<uint64_t, uint32_t, uint64_t>(n); - auto mbuffer2 = create_test_mbuffer<uint64_t, uint32_t, uint64_t>(n); - auto mbuffer3 = create_test_mbuffer<uint64_t, uint32_t, uint64_t>(n); + auto mbuffer1 = create_test_mbuffer<WRec>(n); + auto mbuffer2 = create_test_mbuffer<WRec>(n); + auto mbuffer3 = create_test_mbuffer<WRec>(n); BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); BloomFilter* bf2 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); @@ -81,11 +81,11 @@ START_TEST(t_wirs_init) auto cur_rec = shard4->get_record_at(i); - if (shard1_idx < n && cur_rec->match(rec1)) { + if (shard1_idx < n && *cur_rec == *rec1) { ++shard1_idx; - } else if (shard2_idx < n && cur_rec->match(rec2)) { + } else if (shard2_idx < n && *cur_rec == *rec2) { ++shard2_idx; - } else if (shard3_idx < n && cur_rec->match(rec3)) { + } else if (shard3_idx < n && *cur_rec == *rec3) { ++shard3_idx; } else { assert(false); @@ -109,7 +109,7 @@ START_TEST(t_wirs_init) START_TEST(t_get_lower_bound_index) { size_t n = 10000; - auto mbuffer = create_double_seq_mbuffer<uint64_t, uint32_t, uint64_t>(n); + auto mbuffer = create_double_seq_mbuffer<WRec>(n); ck_assert_ptr_nonnull(mbuffer); BloomFilter* bf = new BloomFilter(100, BF_HASH_FUNCS, g_rng); @@ -120,7 +120,7 @@ START_TEST(t_get_lower_bound_index) auto tbl_records = mbuffer->sorted_output(); for (size_t i=0; i<n; i++) { - const WeightedRec *tbl_rec = mbuffer->get_record_at(i); + const WRec *tbl_rec = mbuffer->get_record_at(i); auto pos = shard->get_lower_bound(tbl_rec->key); ck_assert_int_eq(shard->get_record_at(pos)->key, tbl_rec->key); ck_assert_int_le(pos, i); @@ -135,8 +135,8 @@ START_TEST(t_get_lower_bound_index) START_TEST(t_full_cancelation) { size_t n = 100; - auto buffer = create_double_seq_mbuffer<uint64_t, uint32_t, uint64_t>(n, false); - auto buffer_ts = create_double_seq_mbuffer<uint64_t, uint32_t, uint64_t>(n, true); + auto buffer = create_double_seq_mbuffer<WRec>(n, false); + auto buffer_ts = create_double_seq_mbuffer<WRec>(n, true); BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); BloomFilter* bf2 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); BloomFilter* bf3 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); @@ -171,7 +171,7 @@ END_TEST START_TEST(t_weighted_sampling) { size_t n=1000; - auto buffer = create_weighted_mbuffer<uint64_t, uint32_t, uint64_t>(n); + auto buffer = create_weighted_mbuffer<WRec>(n); BloomFilter* bf = new BloomFilter(100, BF_HASH_FUNCS, g_rng); Shard* shard = new Shard(buffer, bf, false); @@ -181,7 +181,7 @@ START_TEST(t_weighted_sampling) size_t k = 1000; - std::vector<WeightedRec> results; + std::vector<WRec> results; results.reserve(k); size_t cnt[3] = {0}; for (size_t i=0; i<1000; i++) { @@ -193,7 +193,7 @@ START_TEST(t_weighted_sampling) cnt[results[j].key - 1]++; } - WIRS<uint64_t, uint32_t, uint64_t>::delete_state(state); + WIRS<WRec>::delete_state(state); } ck_assert(roughly_equal(cnt[0] / 1000, (double) k/4.0, k, .05)); @@ -211,14 +211,14 @@ START_TEST(t_tombstone_check) { size_t cnt = 1024; size_t ts_cnt = 256; - auto buffer = new WeightedMBuffer(cnt + ts_cnt, true, ts_cnt, g_rng); + auto buffer = new MutableBuffer<WRec>(cnt + ts_cnt, true, ts_cnt, g_rng); std::vector<std::pair<uint64_t, uint32_t>> tombstones; uint64_t key = 1000; uint32_t val = 101; for (size_t i = 0; i < cnt; i++) { - buffer->append(key, val); + buffer->append({key, val, 1}); key++; val++; } @@ -230,14 +230,14 @@ START_TEST(t_tombstone_check) } for (size_t i=0; i<ts_cnt; i++) { - buffer->append(tombstones[i].first, tombstones[i].second, 1.0, true); + buffer->append({tombstones[i].first, tombstones[i].second, 1, 1}); } BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng); auto shard = new Shard(buffer, bf1, false); for (size_t i=0; i<tombstones.size(); i++) { - ck_assert(shard->check_tombstone(tombstones[i].first, tombstones[i].second)); + ck_assert(shard->check_tombstone({tombstones[i].first, tombstones[i].second})); ck_assert_int_eq(shard->get_rejection_count(), i+1); } |