From e920fa57cf9c503e560055864e4de37912b239e1 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 29 May 2023 14:30:08 -0400 Subject: Adjusted the way that Wrapping records works to clean up interfaces --- include/framework/DynamicExtension.h | 51 +++++++++++++++++++++--------------- include/framework/InternalLevel.h | 10 ++++--- include/framework/MutableBuffer.h | 20 +++++++------- include/framework/RecordInterface.h | 48 ++++++++------------------------- 4 files changed, 58 insertions(+), 71 deletions(-) (limited to 'include/framework') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 68f85e2..422ca10 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -75,9 +75,9 @@ typedef ssize_t level_index; template class DynamicExtension { - //typedef typename S> Shard; + //typedef typename S Shard; typedef S Shard; - typedef MutableBuffer> Buffer; + typedef MutableBuffer Buffer; public: DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) @@ -142,18 +142,23 @@ public: level->get_query_states(shards, states, parms); } - std::vector> query_results(shards.size() + 1); + std::vector*> query_results(shards.size() + 1, nullptr); // Execute the query for the buffer - query_results[0] = Q::buffer_query(buffer, buffer_state, parms); + query_results[0] = filter_deletes(Q::buffer_query(buffer, buffer_state, parms), {-1, -1}, buffer); // Execute the query for each shard for (size_t i=0; i, S, Q> *> m_levels; + std::vector *> m_levels; Buffer *get_buffer() { return m_buffer; @@ -274,14 +279,12 @@ private: merge_buffer(); } - WrappedRecord wrec = static_cast>(rec); - - return buffer->append(wrec, ts); + return buffer->append(rec, ts); } - std::vector filter_deletes(std::vector records, ShardID shid, Buffer *buffer) { - std::vector processed_records; - processed_records.reserve(records.size()); + std::vector *filter_deletes(std::vector *records, ShardID shid, Buffer *buffer) { + std::vector *processed_records = new std::vector(); + processed_records->reserve(records->size()); // For delete tagging, we just need to check the delete bit on each // record. @@ -291,9 +294,10 @@ private: continue; } - processed_records.emplace_back(rec.rec); + processed_records->emplace_back(static_cast(rec.rec)); } + delete records; return processed_records; } @@ -320,8 +324,10 @@ private: } } - processed_records.emplace_back(rec.rec); + processed_records->emplace_back(static_cast(rec.rec)); } + + delete records; } /* @@ -337,7 +343,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel, Shard, Q>(new_idx, new_shard_cnt)); + m_levels.emplace_back(new InternalLevel(new_idx, new_shard_cnt)); return new_idx; } @@ -416,14 +422,14 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); + m_levels[base_level] = InternalLevel::merge_levels(m_levels[base_level], m_levels[incoming_level]); mark_as_unused(tmp); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); + m_levels[incoming_level] = new InternalLevel(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); } @@ -432,9 +438,9 @@ private: if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel, Shard, Q>(0, 1); + auto temp_level = new InternalLevel(0, 1); temp_level->append_buffer(buffer); - auto new_level = InternalLevel, Shard, Q>::merge_levels(old_level, temp_level); + auto new_level = InternalLevel::merge_levels(old_level, temp_level); m_levels[0] = new_level; delete temp_level; @@ -450,7 +456,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel, Shard, Q> *level) { + inline void mark_as_unused(InternalLevel *level) { delete level; } @@ -460,6 +466,9 @@ private: * levels below idx are below the limit. */ inline void enforce_delete_maximum(level_index idx) { + // FIXME: currently broken due to tombstone cancellation issues + return; + long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx); if (ts_prop > (long double) m_max_delete_prop) { diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index c489063..d28ba5f 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -28,18 +28,18 @@ class InternalLevel { typedef MutableBuffer Buffer; public: InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector(shard_cap, nullptr)) + : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector(shard_cap, nullptr), free_shards) {} // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 // WARNING: for leveling only. InternalLevel(InternalLevel* level) : m_level_no(level->m_level_no + 1), m_shard_cnt(level->m_shard_cnt) - , m_shards(level->m_shards) { + , m_shards(level->m_shards, free_shards) { assert(m_shard_cnt == 1 && m_shards->size() == 1); } - ~InternalLevel() {} + ~InternalLevel() { } // WARNING: for leveling only. // assuming the base level is the level new level is merging into. (base_level is larger.) @@ -181,6 +181,10 @@ private: size_t m_shard_cnt; size_t m_shard_size_cap; + static void free_shards(std::vector* vec) { + for (size_t i=0; isize(); i++) delete (*vec)[i]; + } + std::shared_ptr> m_shards; }; diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index c154001..bc80922 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -28,14 +28,13 @@ namespace de { template class MutableBuffer { - //typedef WrappedRecord R; public: MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap) : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { - auto len = capacity * sizeof(R); + auto len = capacity * sizeof(Wrapped); size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE)); - m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); + m_data = (Wrapped*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); m_tombstone_filter = nullptr; if (max_tombstone_cap > 0) { m_tombstone_filter = new BloomFilter(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS); @@ -54,10 +53,11 @@ public: int32_t pos = 0; if ((pos = try_advance_tail()) == -1) return 0; - R new_rec = rec; - if (tombstone) new_rec.set_tombstone(); + Wrapped wrec; + wrec.rec = rec; + if (tombstone) wrec.set_tombstone(); - m_data[pos] = new_rec; + m_data[pos] = wrec; m_data[pos].header |= (pos << 2); if (tombstone) { @@ -66,7 +66,7 @@ public: } if constexpr (WeightedRecordInterface) { - m_weight.fetch_add(new_rec.weight); + m_weight.fetch_add(rec.weight); double old = m_max_weight.load(); while (old < rec.weight) { m_max_weight.compare_exchange_strong(old, rec.weight); @@ -123,7 +123,7 @@ public: auto offset = 0; while (offset < m_reccnt.load()) { - if (m_data[offset] == rec && m_data[offset].is_tombstone()) { + if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) { return true; } offset++;; @@ -147,7 +147,7 @@ public: return m_weight.load(); } - R *get_data() { + Wrapped *get_data() { return m_data; } @@ -162,7 +162,7 @@ private: size_t m_cap; size_t m_tombstone_cap; - R* m_data; + Wrapped* m_data; BloomFilter* m_tombstone_filter; alignas(64) std::atomic m_tombstonecnt; diff --git a/include/framework/RecordInterface.h b/include/framework/RecordInterface.h index c8e622f..70c8e01 100644 --- a/include/framework/RecordInterface.h +++ b/include/framework/RecordInterface.h @@ -26,9 +26,9 @@ concept RecordInterface = requires(R r, R s) { }; template -struct WrappedRecord : R { - //R rec; +struct Wrapped { uint32_t header; + R rec; inline void set_delete() { header |= 2; @@ -49,8 +49,14 @@ struct WrappedRecord : R { inline bool is_tombstone() const { return header & 1; } + + inline bool operator<(const Wrapped& other) const { + return (rec.key < other.rec.key) || (rec.key == other.rec.key && rec.value < other.rec.value) + || (rec.key == other.rec.key && rec.value == other.rec.value && header < other.header); + } }; + template concept WeightedRecordInterface = RecordInterface && requires(R r) { {r.weight} -> std::convertible_to; @@ -76,46 +82,14 @@ struct WeightedRecord { K key; V value; W weight = 1; - uint32_t header = 0; - inline void set_delete() { - header |= 2; - } - - inline bool is_deleted() const { - return header & 2; - } - - inline void set_tombstone(bool val=true) { - if (val) { - header |= val; - } else { - header &= 0; - } - } - - inline bool is_tombstone() const { - return header & 1; - } - - inline int match(const WeightedRecord* other) const { - return key == other->key && value == other->value; + inline bool operator==(const WeightedRecord& other) const { + return key == other.key && value == other.value; } - inline bool operator<(const WeightedRecord& other) const { + inline bool operator<(const WeightedRecord& other) const { return key < other.key || (key == other.key && value < other.value); } - - inline bool operator==(const WeightedRecord& other) const { - return key == other.key && value == other.value; - } }; - -template -static bool memtable_record_cmp(const R& a, const R& b) { - return (a.key < b.key) || (a.key == b.key && a.value < b.value) - || (a.key == b.key && a.value == b.value && a.header < b.header); -} - } -- cgit v1.2.3