diff options
| -rw-r--r-- | include/framework/DynamicExtension.h | 51 | ||||
| -rw-r--r-- | include/framework/InternalLevel.h | 10 | ||||
| -rw-r--r-- | include/framework/MutableBuffer.h | 20 | ||||
| -rw-r--r-- | include/framework/RecordInterface.h | 48 | ||||
| -rw-r--r-- | include/shard/WIRS.h | 90 | ||||
| -rw-r--r-- | tests/dynamic_extension_tests.cpp | 36 | ||||
| -rw-r--r-- | tests/internal_level_tests.cpp | 10 | ||||
| -rw-r--r-- | tests/mutable_buffer_tests.cpp | 58 | ||||
| -rw-r--r-- | tests/testing.h | 8 | ||||
| -rw-r--r-- | tests/wirs_tests.cpp | 30 |
10 files changed, 172 insertions, 189 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 68f85e2..422ca10 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -75,9 +75,9 @@ typedef ssize_t level_index; template <RecordInterface R, ShardInterface S, QueryInterface Q, typename FQ=void> class DynamicExtension { - //typedef typename S<WrappedRecord<R>> Shard; + //typedef typename S<R> Shard; typedef S Shard; - typedef MutableBuffer<WrappedRecord<R>> Buffer; + typedef MutableBuffer<R> Buffer; public: DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) @@ -142,18 +142,23 @@ public: level->get_query_states(shards, states, parms); } - std::vector<std::vector<R>> query_results(shards.size() + 1); + std::vector<std::vector<R>*> query_results(shards.size() + 1, nullptr); // Execute the query for the buffer - query_results[0] = Q::buffer_query(buffer, buffer_state, parms); + query_results[0] = filter_deletes(Q::buffer_query(buffer, buffer_state, parms), {-1, -1}, buffer); // Execute the query for each shard for (size_t i=0; i<shards.size(); i++) { - query_results[i] = filter_deletes(Q::query(shards[i].second, states[i], parms)); + query_results[i] = filter_deletes(Q::query(shards[i].second, states[i], parms), shards[i].first, buffer); } // Merge the results together - auto result = Q::merge(query_results); + auto result = Q::merge(&query_results); + + for (size_t i=0; i<query_results.size(); i++) { + delete query_results[i]; + Q::delete_query_state(states[i]); + } return result; } @@ -259,7 +264,7 @@ private: size_t m_scale_factor; double m_max_delete_prop; - std::vector<InternalLevel<WrappedRecord<R>, S, Q> *> m_levels; + std::vector<InternalLevel<R, S, Q> *> m_levels; Buffer *get_buffer() { return m_buffer; @@ -274,14 +279,12 @@ private: merge_buffer(); } - WrappedRecord<R> wrec = static_cast<WrappedRecord<R>>(rec); - - return buffer->append(wrec, ts); + return buffer->append(rec, ts); } - std::vector<R> filter_deletes(std::vector<R> records, ShardID shid, Buffer *buffer) { - std::vector<R> processed_records; - processed_records.reserve(records.size()); + std::vector<R> *filter_deletes(std::vector<R> *records, ShardID shid, Buffer *buffer) { + std::vector<R> *processed_records = new std::vector<R>(); + processed_records->reserve(records->size()); // For delete tagging, we just need to check the delete bit on each // record. @@ -291,9 +294,10 @@ private: continue; } - processed_records.emplace_back(rec.rec); + processed_records->emplace_back(static_cast<R>(rec.rec)); } + delete records; return processed_records; } @@ -320,8 +324,10 @@ private: } } - processed_records.emplace_back(rec.rec); + processed_records->emplace_back(static_cast<R>(rec.rec)); } + + delete records; } /* @@ -337,7 +343,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel<WrappedRecord<R>, Shard, Q>(new_idx, new_shard_cnt)); + m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)); return new_idx; } @@ -416,14 +422,14 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); + m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); mark_as_unused(tmp); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel<WrappedRecord<R>, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); + m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); } @@ -432,9 +438,9 @@ private: if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel<WrappedRecord<R>, Shard, Q>(0, 1); + auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); temp_level->append_buffer(buffer); - auto new_level = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(old_level, temp_level); + auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level); m_levels[0] = new_level; delete temp_level; @@ -450,7 +456,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel<WrappedRecord<R>, Shard, Q> *level) { + inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) { delete level; } @@ -460,6 +466,9 @@ private: * levels below idx are below the limit. */ inline void enforce_delete_maximum(level_index idx) { + // FIXME: currently broken due to tombstone cancellation issues + return; + long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx); if (ts_prop > (long double) m_max_delete_prop) { diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index c489063..d28ba5f 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -28,18 +28,18 @@ class InternalLevel { typedef MutableBuffer<R> Buffer; public: InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<Shard*>(shard_cap, nullptr)) + : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<Shard*>(shard_cap, nullptr), free_shards) {} // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 // WARNING: for leveling only. InternalLevel(InternalLevel* level) : m_level_no(level->m_level_no + 1), m_shard_cnt(level->m_shard_cnt) - , m_shards(level->m_shards) { + , m_shards(level->m_shards, free_shards) { assert(m_shard_cnt == 1 && m_shards->size() == 1); } - ~InternalLevel() {} + ~InternalLevel() { } // WARNING: for leveling only. // assuming the base level is the level new level is merging into. (base_level is larger.) @@ -181,6 +181,10 @@ private: size_t m_shard_cnt; size_t m_shard_size_cap; + static void free_shards(std::vector<Shard*>* vec) { + for (size_t i=0; i<vec->size(); i++) delete (*vec)[i]; + } + std::shared_ptr<std::vector<Shard*>> m_shards; }; diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index c154001..bc80922 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -28,14 +28,13 @@ namespace de { template <RecordInterface R> class MutableBuffer { - //typedef WrappedRecord<R> R; public: MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap) : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { - auto len = capacity * sizeof(R); + auto len = capacity * sizeof(Wrapped<R>); size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE)); - m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); + m_data = (Wrapped<R>*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); m_tombstone_filter = nullptr; if (max_tombstone_cap > 0) { m_tombstone_filter = new BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS); @@ -54,10 +53,11 @@ public: int32_t pos = 0; if ((pos = try_advance_tail()) == -1) return 0; - R new_rec = rec; - if (tombstone) new_rec.set_tombstone(); + Wrapped<R> wrec; + wrec.rec = rec; + if (tombstone) wrec.set_tombstone(); - m_data[pos] = new_rec; + m_data[pos] = wrec; m_data[pos].header |= (pos << 2); if (tombstone) { @@ -66,7 +66,7 @@ public: } if constexpr (WeightedRecordInterface<R_>) { - m_weight.fetch_add(new_rec.weight); + m_weight.fetch_add(rec.weight); double old = m_max_weight.load(); while (old < rec.weight) { m_max_weight.compare_exchange_strong(old, rec.weight); @@ -123,7 +123,7 @@ public: auto offset = 0; while (offset < m_reccnt.load()) { - if (m_data[offset] == rec && m_data[offset].is_tombstone()) { + if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) { return true; } offset++;; @@ -147,7 +147,7 @@ public: return m_weight.load(); } - R *get_data() { + Wrapped<R> *get_data() { return m_data; } @@ -162,7 +162,7 @@ private: size_t m_cap; size_t m_tombstone_cap; - R* m_data; + Wrapped<R>* m_data; BloomFilter<R>* m_tombstone_filter; alignas(64) std::atomic<size_t> m_tombstonecnt; diff --git a/include/framework/RecordInterface.h b/include/framework/RecordInterface.h index c8e622f..70c8e01 100644 --- a/include/framework/RecordInterface.h +++ b/include/framework/RecordInterface.h @@ -26,9 +26,9 @@ concept RecordInterface = requires(R r, R s) { }; template<RecordInterface R> -struct WrappedRecord : R { - //R rec; +struct Wrapped { uint32_t header; + R rec; inline void set_delete() { header |= 2; @@ -49,8 +49,14 @@ struct WrappedRecord : R { inline bool is_tombstone() const { return header & 1; } + + inline bool operator<(const Wrapped& other) const { + return (rec.key < other.rec.key) || (rec.key == other.rec.key && rec.value < other.rec.value) + || (rec.key == other.rec.key && rec.value == other.rec.value && header < other.header); + } }; + template <typename R> concept WeightedRecordInterface = RecordInterface<R> && requires(R r) { {r.weight} -> std::convertible_to<double>; @@ -76,46 +82,14 @@ struct WeightedRecord { K key; V value; W weight = 1; - uint32_t header = 0; - inline void set_delete() { - header |= 2; - } - - inline bool is_deleted() const { - return header & 2; - } - - inline void set_tombstone(bool val=true) { - if (val) { - header |= val; - } else { - header &= 0; - } - } - - inline bool is_tombstone() const { - return header & 1; - } - - inline int match(const WeightedRecord* other) const { - return key == other->key && value == other->value; + inline bool operator==(const WeightedRecord& other) const { + return key == other.key && value == other.value; } - inline bool operator<(const WeightedRecord& other) const { + inline bool operator<(const WeightedRecord& other) const { return key < other.key || (key == other.key && value < other.value); } - - inline bool operator==(const WeightedRecord& other) const { - return key == other.key && value == other.value; - } }; - -template <RecordInterface R> -static bool memtable_record_cmp(const R& a, const R& b) { - return (a.key < b.key) || (a.key == b.key && a.value < b.value) - || (a.key == b.key && a.value == b.value && a.header < b.header); -} - } diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h index 5b610c7..9e4d911 100644 --- a/include/shard/WIRS.h +++ b/include/shard/WIRS.h @@ -34,6 +34,7 @@ template <WeightedRecordInterface R> struct wirs_query_parms { decltype(R::key) lower_bound; decltype(R::key) upper_bound; + size_t sample_size; }; template <WeightedRecordInterface R> @@ -73,9 +74,9 @@ public: WIRS(MutableBuffer<R>* buffer) : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) { - size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE); + size_t alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); m_bf = new BloomFilter<K>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS); @@ -84,11 +85,11 @@ public: auto base = buffer->get_data(); auto stop = base + buffer->get_record_count(); - std::sort(base, stop, memtable_record_cmp<R>); + std::sort(base, stop, std::less<Wrapped<R>>()); while (base < stop) { if (!(base->is_tombstone()) && (base + 1) < stop) { - if (*base == *(base + 1) && (base + 1)->is_tombstone()) { + if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { base += 2; wirs_cancelations++; continue; @@ -100,11 +101,11 @@ public: base->header &= 1; m_data[m_reccnt++] = *base; - m_total_weight+= base->weight; + m_total_weight+= base->rec.weight; if (m_bf && base->is_tombstone()) { m_tombstone_cnt++; - m_bf->insert(base->key); + m_bf->insert(base->rec.key); } base++; @@ -117,56 +118,56 @@ public: WIRS(WIRS** shards, size_t len) : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) { - std::vector<Cursor<R>> cursors; + std::vector<Cursor<Wrapped<R>>> cursors; cursors.reserve(len); - PriorityQueue<R> pq(len); + PriorityQueue<Wrapped<R>> pq(len); size_t attemp_reccnt = 0; size_t tombstone_count = 0; for (size_t i = 0; i < len; ++i) { if (shards[i]) { - auto base = shards[i]->sorted_output(); + auto base = shards[i]->get_data(); cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); attemp_reccnt += shards[i]->get_record_count(); tombstone_count += shards[i]->get_tombstone_count(); pq.push(cursors[i].ptr, i); } else { - cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0}); + cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); } } m_bf = new BloomFilter<K>(BF_FPR, tombstone_count, BF_HASH_FUNCS); - size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE); + size_t alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<R>{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; if (!now.data->is_tombstone() && next.data != nullptr && - *now.data == *next.data && next.data->is_tombstone()) { + now.data->rec == next.data->rec && next.data->is_tombstone()) { pq.pop(); pq.pop(); auto& cursor1 = cursors[now.version]; auto& cursor2 = cursors[next.version]; - if (advance_cursor<R>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor<R>(cursor2)) pq.push(cursor2.ptr, next.version); + if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto& cursor = cursors[now.version]; if (!cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; - m_total_weight += cursor.ptr->weight; + m_total_weight += cursor.ptr->rec.weight; if (m_bf && cursor.ptr->is_tombstone()) { ++m_tombstone_cnt; - if (m_bf) m_bf->insert(cursor.ptr->key); + if (m_bf) m_bf->insert(cursor.ptr->rec.key); } } pq.pop(); - if (advance_cursor<R>(cursor)) pq.push(cursor.ptr, now.version); + if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version); } } @@ -186,7 +187,7 @@ public: free_tree(m_root); } - R *point_lookup(R &rec, bool filter=false) { + Wrapped<R> *point_lookup(R &rec, bool filter=false) { if (filter && !m_bf->lookup(rec.key)) { return nullptr; } @@ -205,7 +206,7 @@ public: return nullptr; } - R* sorted_output() const { + Wrapped<R>* get_data() const { return m_data; } @@ -217,7 +218,7 @@ public: return m_tombstone_cnt; } - const R* get_record_at(size_t idx) const { + const Wrapped<R>* get_record_at(size_t idx) const { if (idx >= m_reccnt) return nullptr; return m_data + idx; } @@ -273,7 +274,7 @@ private: double group_weight = 0.0; group_norm_weight.clear(); for (size_t k = 0; k < m_group_size && i < m_reccnt; ++k, ++i) { - auto w = m_data[i].weight; + auto w = m_data[i].rec.weight; group_norm_weight.emplace_back(w); group_weight += w; sum_weight += w; @@ -325,7 +326,7 @@ private: } } - R* m_data; + Wrapped<R>* m_data; std::vector<Alias *> m_alias; wirs_node<R>* m_root; W m_total_weight; @@ -339,10 +340,10 @@ private: template <WeightedRecordInterface R> class WIRSQuery { public: - static void *get_query_state(wirs_query_parms<R> *parameters, WIRS<R> *wirs) { + static void *get_query_state(WIRS<R> *wirs, void *parms) { auto res = new WIRSState<R>(); - decltype(R::key) lower_key = ((wirs_query_parms<R> *) parameters)->lower_bound; - decltype(R::key) upper_key = ((wirs_query_parms<R> *) parameters)->upper_bound; + decltype(R::key) lower_key = ((wirs_query_parms<R> *) parms)->lower_bound; + decltype(R::key) upper_key = ((wirs_query_parms<R> *) parms)->upper_bound; // Simulate a stack to unfold recursion. double tot_weight = 0.0; @@ -351,13 +352,13 @@ public: size_t top = 1; while(top > 0) { auto now = st[--top]; - if (covered_by(now, lower_key, upper_key) || - (now->left == nullptr && now->right == nullptr && intersects(now, lower_key, upper_key))) { + if (wirs->covered_by(now, lower_key, upper_key) || + (now->left == nullptr && now->right == nullptr && wirs->intersects(now, lower_key, upper_key))) { res->nodes.emplace_back(now); tot_weight += now->weight; } else { - if (now->left && intersects(now->left, lower_key, upper_key)) st[top++] = now->left; - if (now->right && intersects(now->right, lower_key, upper_key)) st[top++] = now->right; + if (now->left && wirs->intersects(now->left, lower_key, upper_key)) st[top++] = now->left; + if (now->right && wirs->intersects(now->right, lower_key, upper_key)) st[top++] = now->right; } } @@ -371,13 +372,19 @@ public: return res; } - static std::vector<R> *query(wirs_query_parms<R> *parameters, WIRSState<R> *state, WIRS<R> *wirs) { - auto sample_sz = parameters->sample_size; - auto lower_key = parameters->lower_bound; - auto upper_key = parameters->upper_bound; - auto rng = parameters->rng; + static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) { + return nullptr; + } + + static std::vector<Wrapped<R>> *query(WIRS<R> *wirs, void *q_state, void *parms) { + auto sample_sz = ((wirs_query_parms<R> *) parms)->sample_size; + auto lower_key = ((wirs_query_parms<R> *) parms)->lower_bound; + auto upper_key = ((wirs_query_parms<R> *) parms)->upper_bound; + auto rng = ((wirs_query_parms<R> *) parms)->rng; + + auto state = (WIRSState<R> *) q_state; - std::vector<R> *result_set = new std::vector<R>(); + std::vector<Wrapped<R>> *result_set = new std::vector<Wrapped<R>>(); if (sample_sz == 0) { return 0; @@ -407,14 +414,19 @@ public: return result_set; } - static std::vector<R> *merge(std::vector<std::vector<R>> *results) { - std::vector<R> *output = new std::vector<R>(); + static std::vector<Wrapped<R>> *buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) { + return new std::vector<Wrapped<R>>(); + } + + static std::vector<R> merge(std::vector<std::vector<R>*> *results) { + std::vector<R> output; for (size_t i=0; i<results->size(); i++) { for (size_t j=0; j<(*results)[i]->size(); j++) { output->emplace_back(*((*results)[i])[j]); } } + return output; } diff --git a/tests/dynamic_extension_tests.cpp b/tests/dynamic_extension_tests.cpp index b74ab38..9fdd5f9 100644 --- a/tests/dynamic_extension_tests.cpp +++ b/tests/dynamic_extension_tests.cpp @@ -20,7 +20,7 @@ #include <check.h> using namespace de; -typedef DynamicExtension<WRec, WIRS<WrappedRecord<WRec>>, WIRSQuery<WrappedRecord<WRec>>> DE; +typedef DynamicExtension<WRec, WIRS<WRec>, WIRSQuery<WRec>> DE; START_TEST(t_create) { @@ -147,7 +147,9 @@ START_TEST(t_range_sample_memlevels) delete ext_wirs; } END_TEST +*/ +/* START_TEST(t_range_sample_weighted) { auto ext_wirs = new DE(100, 2, 1); @@ -186,23 +188,24 @@ START_TEST(t_range_sample_weighted) weight = 8; } - WRec r = {keys[i], (uint32_t) i, weight, 0}; + WRec r = {keys[i], (uint32_t) i, weight}; ext_wirs->insert(r); } size_t k = 1000; uint64_t lower_key = 0; uint64_t upper_key = 5; - WRec* buffer = new WRec[k](); - char *buffer1 = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE); - char *buffer2 = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE); - size_t cnt[3] = {0}; for (size_t i=0; i<1000; i++) { - ext_wirs->range_sample(buffer, lower_key, upper_key, k); + wirs_query_parms<WRec> p; + p.lower_bound = lower_key; + p.upper_bound = upper_key; + p.sample_size = k; + + auto result = ext_wirs->query(&p); for (size_t j=0; j<k; j++) { - cnt[buffer[j].key - 1]++; + cnt[result[j].key - 1]++; } } @@ -211,9 +214,6 @@ START_TEST(t_range_sample_weighted) ck_assert(roughly_equal(cnt[2] / 1000, (double) k/2.0, k, .05)); delete ext_wirs; - delete[] buffer; - free(buffer1); - free(buffer2); } END_TEST */ @@ -242,7 +242,7 @@ START_TEST(t_tombstone_merging_01) size_t deletes = 0; size_t cnt=0; for (auto rec : records) { - WRec r = {rec.first, rec.second, 1, 0}; + WRec r = {rec.first, rec.second, 1}; ck_assert_int_eq(ext_wirs->insert(r), 1); if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) { @@ -251,8 +251,7 @@ START_TEST(t_tombstone_merging_01) for (size_t i=0; i<del_vec.size(); i++) { WRec dr = {del_vec[i].first, del_vec[i].second, 1}; - dr.set_tombstone(); - ext_wirs->insert(dr); + ext_wirs->erase(dr); deletes++; to_delete.erase(del_vec[i]); deleted.insert(del_vec[i]); @@ -300,8 +299,7 @@ DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) { std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); for (size_t i=0; i<del_vec.size(); i++) { - del_vec[i].set_tombstone(); - ext_wirs->insert(del_vec[i]); + ext_wirs->erase(del_vec[i]); deletes++; to_delete.erase(del_vec[i]); deleted.insert(del_vec[i]); @@ -349,9 +347,7 @@ START_TEST(t_sorted_array) for (size_t i=0; i<del_vec.size(); i++) { WRec dr = {del_vec[i].first, del_vec[i].second, 1}; - dr.set_tombstone(); - - ext_wirs->insert(dr ); + ext_wirs->erase(dr); deletes++; to_delete.erase(del_vec[i]); deleted.insert(del_vec[i]); @@ -368,7 +364,7 @@ START_TEST(t_sorted_array) uint64_t prev_key = 0; for (size_t i=0; i<flat->get_record_count(); i++) { - auto k = flat->get_record_at(i)->key; + auto k = flat->get_record_at(i)->rec.key; ck_assert_int_ge(k, prev_key); prev_key = k; } diff --git a/tests/internal_level_tests.cpp b/tests/internal_level_tests.cpp index 42c9044..1bbc458 100644 --- a/tests/internal_level_tests.cpp +++ b/tests/internal_level_tests.cpp @@ -21,12 +21,12 @@ using namespace de; -typedef InternalLevel<WrappedRecord<WRec>, WIRS<WrappedRecord<WRec>>, WIRSQuery<WrappedRecord<WRec>>> ILevel; +typedef InternalLevel<WRec, WIRS<WRec>, WIRSQuery<WRec>> ILevel; START_TEST(t_memlevel_merge) { - auto tbl1 = create_test_mbuffer<WrappedWRec>(100); - auto tbl2 = create_test_mbuffer<WrappedWRec>(100); + auto tbl1 = create_test_mbuffer<WRec>(100); + auto tbl2 = create_test_mbuffer<WRec>(100); auto base_level = new ILevel(1, 1); base_level->append_buffer(tbl1); @@ -50,8 +50,8 @@ START_TEST(t_memlevel_merge) ILevel *create_test_memlevel(size_t reccnt) { - auto tbl1 = create_test_mbuffer<WrappedWRec>(reccnt/2); - auto tbl2 = create_test_mbuffer<WrappedWRec>(reccnt/2); + auto tbl1 = create_test_mbuffer<WRec>(reccnt/2); + auto tbl2 = create_test_mbuffer<WRec>(reccnt/2); auto base_level = new ILevel(1, 2); base_level->append_buffer(tbl1); diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp index fc8b511..bb8e2c2 100644 --- a/tests/mutable_buffer_tests.cpp +++ b/tests/mutable_buffer_tests.cpp @@ -25,7 +25,7 @@ using namespace de; START_TEST(t_create) { - auto buffer = new MutableBuffer<WrappedRec>(100, true, 50); + auto buffer = new MutableBuffer<Rec>(100, true, 50); ck_assert_ptr_nonnull(buffer); ck_assert_int_eq(buffer->get_capacity(), 100); @@ -42,12 +42,12 @@ END_TEST START_TEST(t_insert) { - auto buffer = new MutableBuffer<WrappedWRec>(100, true, 50); + auto buffer = new MutableBuffer<WRec>(100, true, 50); uint64_t key = 0; uint32_t val = 5; - WrappedWRec rec = {0, 5, 1}; + WRec rec = {0, 5, 1}; for (size_t i=0; i<99; i++) { ck_assert_int_eq(buffer->append(rec), 1); @@ -77,11 +77,11 @@ END_TEST START_TEST(t_insert_tombstones) { - auto buffer = new MutableBuffer<WrappedRec>(100, true, 50); + auto buffer = new MutableBuffer<Rec>(100, true, 50); size_t ts_cnt = 0; - WrappedRec rec = {0, 5}; + Rec rec = {0, 5}; for (size_t i=0; i<99; i++) { bool ts = false; @@ -90,9 +90,7 @@ START_TEST(t_insert_tombstones) ts=true; } - rec.set_tombstone(ts); - - ck_assert_int_eq(buffer->append(rec), 1); + ck_assert_int_eq(buffer->append(rec, ts), 1); ck_assert_int_eq(buffer->check_tombstone(rec), ts); rec.key++; @@ -104,11 +102,9 @@ START_TEST(t_insert_tombstones) } // inserting one more tombstone should not be possible - rec.set_tombstone(); - ck_assert_int_eq(buffer->append(rec), 0); + ck_assert_int_eq(buffer->append(rec, true), 0); - rec.set_tombstone(false); ck_assert_int_eq(buffer->append(rec), 1); rec.key++; @@ -124,10 +120,10 @@ END_TEST START_TEST(t_truncate) { - auto buffer = new MutableBuffer<WrappedRec>(100, true, 100); + auto buffer = new MutableBuffer<Rec>(100, true, 100); size_t ts_cnt = 0; - WrappedRec rec = {0, 5}; + Rec rec = {0, 5}; for (size_t i=0; i<100; i++) { bool ts = false; @@ -136,9 +132,7 @@ START_TEST(t_truncate) ts=true; } - rec.set_tombstone(ts); - - ck_assert_int_eq(buffer->append(rec), 1); + ck_assert_int_eq(buffer->append(rec, ts), 1); ck_assert_int_eq(buffer->check_tombstone(rec), ts); rec.key++; @@ -149,7 +143,6 @@ START_TEST(t_truncate) } ck_assert_int_eq(buffer->is_full(), 1); - rec.set_tombstone(false); ck_assert_int_eq(buffer->append(rec), 0); ck_assert_int_eq(buffer->truncate(), 1); @@ -165,11 +158,11 @@ START_TEST(t_truncate) END_TEST -START_TEST(t_sorted_output) +START_TEST(t_get_data) { size_t cnt = 100; - auto buffer = new MutableBuffer<WrappedRec>(cnt, true, cnt/2); + auto buffer = new MutableBuffer<Rec>(cnt, true, cnt/2); std::vector<uint64_t> keys(cnt); @@ -184,23 +177,22 @@ START_TEST(t_sorted_output) uint32_t val = 12345; for (size_t i=0; i<cnt-2; i++) { - buffer->append(WrappedRec {keys[i], val}); + buffer->append(Rec {keys[i], val}); } - WrappedRec r1 = {keys[cnt-2], val}; - r1.set_tombstone(); - buffer->append(r1); + Rec r1 = {keys[cnt-2], val}; + buffer->append(r1, true); - WrappedRec r2 = {keys[cnt-1], val}; - r2.set_tombstone(); - buffer->append(r2); + Rec r2 = {keys[cnt-1], val}; + buffer->append(r2, true); auto *sorted_records = buffer->get_data(); std::sort(keys.begin(), keys.end()); + std::sort(sorted_records, sorted_records + buffer->get_record_count(), std::less<Wrapped<Rec>>()); for (size_t i=0; i<cnt; i++) { - ck_assert_int_eq(sorted_records[i].key, keys[i]); + ck_assert_int_eq(sorted_records[i].rec.key, keys[i]); } delete buffer; @@ -208,7 +200,7 @@ START_TEST(t_sorted_output) END_TEST -void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t start, size_t stop, MutableBuffer<WrappedRec> *buffer) +void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t start, size_t stop, MutableBuffer<Rec> *buffer) { for (size_t i=start; i<stop; i++) { buffer->append({(*values)[i].first, (*values)[i].second}); @@ -220,11 +212,11 @@ void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t s START_TEST(t_multithreaded_insert) { size_t cnt = 10000; - auto buffer = new MutableBuffer<WrappedRec>(cnt, true, cnt/2); + auto buffer = new MutableBuffer<Rec>(cnt, true, cnt/2); - std::vector<WrappedRec> records(cnt); + std::vector<Rec> records(cnt); for (size_t i=0; i<cnt; i++) { - records[i] = WrappedRec {(uint64_t) rand(), (uint32_t) rand()}; + records[i] = Rec {(uint64_t) rand(), (uint32_t) rand()}; } // perform a t_multithreaded insertion @@ -285,8 +277,8 @@ Suite *unit_testing() suite_add_tcase(unit, truncate); - TCase *sorted_out = tcase_create("de::MutableBuffer::sorted_output"); - tcase_add_test(sorted_out, t_sorted_output); + TCase *sorted_out = tcase_create("de::MutableBuffer::get_data"); + tcase_add_test(sorted_out, t_get_data); suite_add_tcase(unit, sorted_out); diff --git a/tests/testing.h b/tests/testing.h index 3ffafcd..cd0045a 100644 --- a/tests/testing.h +++ b/tests/testing.h @@ -22,8 +22,6 @@ typedef de::WeightedRecord<uint64_t, uint32_t, uint64_t> WRec; typedef de::Record<uint64_t, uint32_t> Rec; -typedef de::WrappedRecord<WRec> WrappedWRec; -typedef de::WrappedRecord<Rec> WrappedRec; static bool initialize_test_file(std::string fname, size_t page_cnt) { @@ -149,18 +147,16 @@ static de::MutableBuffer<R> *create_double_seq_mbuffer(size_t cnt, bool ts=false R rec; rec.key = i; rec.value = i; - if (ts) rec.set_tombstone(); - buffer->append(rec); + buffer->append(rec, ts); } for (size_t i = 0; i < cnt / 2; i++) { R rec; rec.key = i; rec.value = i + 1; - if (ts) rec.set_tombstone(); - buffer->append(rec); + buffer->append(rec, ts); } return buffer; diff --git a/tests/wirs_tests.cpp b/tests/wirs_tests.cpp index 3a80a9e..ba4b754 100644 --- a/tests/wirs_tests.cpp +++ b/tests/wirs_tests.cpp @@ -17,39 +17,39 @@ using namespace de; -typedef WIRS<WrappedWRec> Shard; +typedef WIRS<WRec> Shard; START_TEST(t_mbuffer_init) { - auto mem_table = new MutableBuffer<WrappedWRec>(1024, true, 1024); + auto buffer = new MutableBuffer<WRec>(1024, true, 1024); for (uint64_t i = 512; i > 0; i--) { uint32_t v = i; - mem_table->append({i,v, 1}); + buffer->append({i,v, 1}); } for (uint64_t i = 1; i <= 256; ++i) { uint32_t v = i; - mem_table->append({i, v, 1, 1}); + buffer->append({i, v, 1}, true); } for (uint64_t i = 257; i <= 512; ++i) { uint32_t v = i + 1; - mem_table->append({i, v, 1}); + buffer->append({i, v, 1}); } - Shard* shard = new Shard(mem_table); + Shard* shard = new Shard(buffer); ck_assert_uint_eq(shard->get_record_count(), 512); - delete mem_table; + delete buffer; delete shard; } START_TEST(t_wirs_init) { size_t n = 512; - auto mbuffer1 = create_test_mbuffer<WrappedWRec>(n); - auto mbuffer2 = create_test_mbuffer<WrappedWRec>(n); - auto mbuffer3 = create_test_mbuffer<WrappedWRec>(n); + auto mbuffer1 = create_test_mbuffer<WRec>(n); + auto mbuffer2 = create_test_mbuffer<WRec>(n); + auto mbuffer3 = create_test_mbuffer<WRec>(n); auto shard1 = new Shard(mbuffer1); auto shard2 = new Shard(mbuffer2); @@ -73,11 +73,11 @@ START_TEST(t_wirs_init) auto cur_rec = shard4->get_record_at(i); - if (shard1_idx < n && *cur_rec == *rec1) { + if (shard1_idx < n && cur_rec->rec == rec1->rec) { ++shard1_idx; - } else if (shard2_idx < n && *cur_rec == *rec2) { + } else if (shard2_idx < n && cur_rec->rec == rec2->rec) { ++shard2_idx; - } else if (shard3_idx < n && *cur_rec == *rec3) { + } else if (shard3_idx < n && cur_rec->rec == rec3->rec) { ++shard3_idx; } else { assert(false); @@ -123,8 +123,8 @@ START_TEST(t_get_lower_bound_index) START_TEST(t_full_cancelation) { size_t n = 100; - auto buffer = create_double_seq_mbuffer<WrappedWRec>(n, false); - auto buffer_ts = create_double_seq_mbuffer<WrappedWRec>(n, true); + auto buffer = create_double_seq_mbuffer<WRec>(n, false); + auto buffer_ts = create_double_seq_mbuffer<WRec>(n, true); Shard* shard = new Shard(buffer); Shard* shard_ts = new Shard(buffer_ts); |