From 0cf160ee68d37be93665e665ef22ae6e211a157d Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 22 May 2023 14:58:22 -0400 Subject: More updates/restructuring --- include/ds/PriorityQueue.h | 2 +- include/framework/DynamicExtension.h | 46 +++++++------ include/framework/InternalLevel.h | 39 +++++++---- include/framework/MutableBuffer.h | 2 +- include/framework/RecordInterface.h | 121 +++++++++++++++++++++++++++++++++++ include/framework/ShardInterface.h | 12 ++-- include/shard/MemISAM.h | 22 +++++++ include/shard/WIRS.h | 113 ++++++-------------------------- include/util/Cursor.h | 2 +- include/util/Record.h | 121 ----------------------------------- 10 files changed, 224 insertions(+), 256 deletions(-) create mode 100644 include/framework/RecordInterface.h delete mode 100644 include/util/Record.h diff --git a/include/ds/PriorityQueue.h b/include/ds/PriorityQueue.h index 22582da..0468fa4 100644 --- a/include/ds/PriorityQueue.h +++ b/include/ds/PriorityQueue.h @@ -12,7 +12,7 @@ #include #include -#include "util/Record.h" +#include "framework/RecordInterface.h" namespace de { diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 2009344..a70dda4 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -18,6 +18,7 @@ #include "framework/InternalLevel.h" #include "framework/ShardInterface.h" #include "framework/QueryInterface.h" +#include "framework/RecordInterface.h" #include "shard/WIRS.h" #include "ds/Alias.h" @@ -74,12 +75,15 @@ typedef ssize_t level_index; template class DynamicExtension { + //typedef typename S> Shard; + typedef S Shard; + typedef MutableBuffer Buffer; public: DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop), - m_buffer(new MutableBuffer(buffer_cap, LSM_REJ_SAMPLE, buffer_cap * max_delete_prop)) - {} + m_buffer(new Buffer(buffer_cap, LSM_REJ_SAMPLE, buffer_cap * max_delete_prop)) + { } ~DynamicExtension() { delete m_buffer; @@ -94,7 +98,7 @@ public: } int erase(const R &rec) { - MutableBuffer *buffer; + Buffer *buffer; if constexpr (DELETE_TAGGING) { auto buffer = get_buffer(); @@ -131,7 +135,7 @@ public: auto buffer_state = Q::get_buffer_query_state(buffer, parms); // Get the shard query states - std::vector> shards; + std::vector> shards; std::vector states; for (auto &level : m_levels) { @@ -218,8 +222,8 @@ public: return m_buffer->get_capacity(); } - S *create_ssi() { - std::vector shards; + Shard *create_ssi() { + std::vector shards; if (m_levels.size() > 0) { for (int i=m_levels.size() - 1; i>= 0; i--) { @@ -231,7 +235,7 @@ public: shards.emplace_back(new S(get_buffer(), nullptr)); - S *shards_array[shards.size()]; + Shard *shards_array[shards.size()]; size_t j = 0; for (size_t i=0; i *m_buffer; + Buffer *m_buffer; size_t m_scale_factor; double m_max_delete_prop; std::vector *> m_levels; - MutableBuffer *get_buffer() { + Buffer *get_buffer() { return m_buffer; } int internal_append(R &rec, bool ts) { - MutableBuffer *buffer; + Buffer *buffer; while (!(buffer = get_buffer())) ; @@ -273,7 +277,7 @@ private: return buffer->append(rec, ts); } - std::vector post_process(std::vector records, ShardID shid, MutableBuffer *buffer) { + std::vector post_process(std::vector records, ShardID shid, Buffer *buffer) { std::vector processed_records; processed_records.reserve(records.size()); @@ -331,7 +335,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel(new_idx, new_shard_cnt)); + m_levels.emplace_back(new InternalLevel(new_idx, new_shard_cnt)); return new_idx; } @@ -380,7 +384,7 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first merge. */ - inline level_index find_mergable_level(level_index idx, MutableBuffer *buffer=nullptr) { + inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { if (idx == 0 && m_levels.size() == 0) return -1; @@ -410,25 +414,25 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel::merge_levels(m_levels[base_level], m_levels[incoming_level]); + m_levels[base_level] = InternalLevel::merge_levels(m_levels[base_level], m_levels[incoming_level]); mark_as_unused(tmp); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); + m_levels[incoming_level] = new InternalLevel(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); } - inline void merge_buffer_into_l0(MutableBuffer *buffer) { + inline void merge_buffer_into_l0(Buffer *buffer) { assert(m_levels[0]); if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel(0, 1); + auto temp_level = new InternalLevel(0, 1); temp_level->append_buffer(buffer); - auto new_level = InternalLevel::merge_levels(old_level, temp_level); + auto new_level = InternalLevel::merge_levels(old_level, temp_level); m_levels[0] = new_level; delete temp_level; @@ -444,7 +448,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel *level) { + inline void mark_as_unused(InternalLevel *level) { delete level; } @@ -477,7 +481,7 @@ private: * a pointer to the memory table to use, if desired. Otherwise, there are * no guarantees about which buffer will be accessed if level_index is -1. */ - inline size_t get_level_record_count(level_index idx, MutableBuffer *buffer=nullptr) { + inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) { assert(idx >= -1); if (idx == -1) { diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index 70da821..6986a61 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -15,18 +15,20 @@ #include "util/types.h" #include "util/bf_config.h" #include "framework/ShardInterface.h" -#include "framework/MutableBuffer.h" #include "framework/QueryInterface.h" +#include "framework/RecordInterface.h" +#include "framework/MutableBuffer.h" #include "ds/BloomFilter.h" namespace de { template class InternalLevel { - + typedef S Shard; + typedef MutableBuffer Buffer; public: InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector(shard_cap, nullptr)) + : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector(shard_cap, nullptr)) {} // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 @@ -45,7 +47,7 @@ public: assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); auto res = new InternalLevel(base_level->m_level_no, 1); res->m_shard_cnt = 1; - S* shards[2]; + Shard* shards[2]; shards[0] = base_level->m_shards[0]; shards[1] = new_level->m_shards[0]; @@ -53,7 +55,7 @@ public: return res; } - void append_buffer(MutableBuffer* buffer) { + void append_buffer(Buffer* buffer) { assert(m_shard_cnt < m_shards.size()); m_shards[m_shard_cnt] = new S(buffer); ++m_shard_cnt; @@ -65,8 +67,8 @@ public: ++m_shard_cnt; } - S *get_merged_shard() { - S *shards[m_shard_cnt]; + Shard *get_merged_shard() { + Shard *shards[m_shard_cnt]; for (size_t i=0; i> &shards, std::vector& shard_states, void *query_parms) { + void get_query_states(std::vector> &shards, std::vector& shard_states, void *query_parms) { for (size_t i=0; i= (ssize_t) shard_stop; i--) { - if (m_shards[i] && m_shards[i]->check_tombstone(rec)) - return true; + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec, true); + if (res && res->is_tombstone()) { + return true; + } + } } return false; } bool delete_record(const R &rec) { + if (m_shard_cnt == 0) return false; + for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i] && m_shards[i]->delete_record(rec)) { - return true; + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec); + if (res) { + res->set_delete(); + } } } return false; } - S* get_shard(size_t idx) { + Shard* get_shard(size_t idx) { return m_shards[idx]; } @@ -170,7 +181,7 @@ private: size_t m_shard_cnt; size_t m_shard_size_cap; - std::shared_ptr> m_shards; + std::shared_ptr> m_shards; }; } diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index 4b45f20..3643a89 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -19,9 +19,9 @@ #include "util/base.h" #include "util/bf_config.h" #include "ds/BloomFilter.h" -#include "util/Record.h" #include "ds/Alias.h" #include "util/timer.h" +#include "framework/RecordInterface.h" namespace de { diff --git a/include/framework/RecordInterface.h b/include/framework/RecordInterface.h new file mode 100644 index 0000000..8afd90a --- /dev/null +++ b/include/framework/RecordInterface.h @@ -0,0 +1,121 @@ +/* + * include/framework/RecordInterface.h + * + * Copyright (C) 2023 Douglas Rumbaugh + * Dong Xie + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include +#include + +#include "util/base.h" + +namespace de { + +template +concept RecordInterface = requires(R r, R s) { + r.key; + r.value; + + { r < s } ->std::convertible_to; + { r == s } ->std::convertible_to; +}; + +template +struct WrappedRecord { + R rec; + uint32_t header; + + inline void set_delete() { + header |= 2; + } + + inline bool is_deleted() const { + return header & 2; + } + + inline void set_tombstone(bool val=true) { + if (val) { + header |= val; + } else { + header &= 0; + } + } + + inline bool is_tombstone() const { + return header & 1; + } +}; + +template +concept WeightedRecordInterface = RecordInterface && requires(R r) { + {r.weight} -> std::convertible_to; +}; + +template +struct Record { + K key; + V value; + uint32_t header = 0; + + inline bool operator<(const Record& other) const { + return key < other.key || (key == other.key && value < other.value); + } + + inline bool operator==(const Record& other) const { + return key == other.key && value == other.value; + } +}; + +template +struct WeightedRecord { + K key; + V value; + W weight = 1; + uint32_t header = 0; + + inline void set_delete() { + header |= 2; + } + + inline bool is_deleted() const { + return header & 2; + } + + inline void set_tombstone(bool val=true) { + if (val) { + header |= val; + } else { + header &= 0; + } + } + + inline bool is_tombstone() const { + return header & 1; + } + + inline int match(const WeightedRecord* other) const { + return key == other->key && value == other->value; + } + + inline bool operator<(const WeightedRecord& other) const { + return key < other.key || (key == other.key && value < other.value); + } + + inline bool operator==(const WeightedRecord& other) const { + return key == other.key && value == other.value; + } +}; + + +template +static bool memtable_record_cmp(const R& a, const R& b) { + return (a.key < b.key) || (a.key == b.key && a.value < b.value) + || (a.key == b.key && a.value == b.value && a.header < b.header); +} + +} diff --git a/include/framework/ShardInterface.h b/include/framework/ShardInterface.h index 1f48a45..3aa62df 100644 --- a/include/framework/ShardInterface.h +++ b/include/framework/ShardInterface.h @@ -11,12 +11,16 @@ #include #include "util/types.h" +#include "framework/RecordInterface.h" -template -concept ShardInterface = requires(S s, void *p) { - s.point_lookup(); +namespace de { +//template typename S, typename R> +template +concept ShardInterface = requires(S s, void *p, bool b) { + //{s.point_lookup(r, b) } -> std::same_as; {s.get_record_count()} -> std::convertible_to; - {s.get_tombstone_count()} -> std::convertible_to; {s.get_memory_usage()} -> std::convertible_to; }; + +} diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h index 55699be..8ac17e4 100644 --- a/include/shard/MemISAM.h +++ b/include/shard/MemISAM.h @@ -27,6 +27,8 @@ thread_local size_t mrun_cancelations = 0; template class MemISAM { private: + friend class InternalLevel; + typedef decltype(R::key) K; typedef decltype(R::value) V; @@ -178,6 +180,26 @@ public: return m_tombstone_cnt; } + R *point_lookup(R &rec, bool filter) { + + if (filter && !m_bf->lookup(rec.key)) { + return nullptr; + } + + size_t idx = get_lower_bound(rec.key); + if (idx >= m_reccnt) { + return false; + } + + while (idx < m_reccnt && m_data[idx] < rec) ++idx; + + if (m_data[idx] == rec) { + return m_data + idx; + } + + return nullptr; + } + bool delete_record(const K& key, const V& val) { size_t idx = get_lower_bound(key); if (idx >= m_reccnt) { diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h index 42dbcfd..7dee496 100644 --- a/include/shard/WIRS.h +++ b/include/shard/WIRS.h @@ -1,4 +1,5 @@ /* + {s.get_tombstone_count()} -> std::convertible_to; * include/shard/WIRS.h * * Copyright (C) 2023 Dong Xie @@ -8,6 +9,7 @@ */ #pragma once + #include #include #include @@ -19,8 +21,8 @@ #include "ds/Alias.h" #include "ds/BloomFilter.h" #include "util/bf_config.h" -#include "util/Record.h" #include "framework/MutableBuffer.h" +#include "framework/RecordInterface.h" namespace de { @@ -32,8 +34,11 @@ struct wirs_query_parms { decltype(R::key) upper_bound; }; +class InternalLevel; + template class WIRS { + friend class InternalLevel; private: typedef decltype(R::key) K; @@ -62,8 +67,7 @@ private: public: WIRS(MutableBuffer* buffer) - : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), - m_ts_check_cnt(0), m_root(nullptr) { + : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) { size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); @@ -106,8 +110,7 @@ public: } WIRS(WIRS** shards, size_t len) - : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0), - m_root(nullptr) { + : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) { std::vector> cursors; cursors.reserve(len); @@ -177,24 +180,25 @@ public: free_tree(m_root); } - bool delete_record(const R& rec) { + R *point_lookup(R &rec, bool filter=false) { + if (filter && !m_bf.lookup(rec.key)) { + return nullptr; + } + size_t idx = get_lower_bound(rec.key); if (idx >= m_reccnt) { - return false; + return nullptr; } while (idx < m_reccnt && m_data[idx] < rec) ++idx; - if (m_data[idx] == R {rec.key, rec.val} && !m_data[idx].is_tombstone()) { - m_data[idx].set_delete(); - m_deleted_cnt++; - return true; + if (m_data[idx] == rec) { + return m_data + idx; } - return false; + return nullptr; } - R* sorted_output() const { return m_data; } @@ -212,46 +216,12 @@ public: return m_data + idx; } - /* - // low - high -> decompose to a set of nodes. - // Build Alias across the decomposed nodes. - WIRSState* get_query_state(void *query_parameters) { - auto res = new WIRSState(); - K lower_key = ((wirs_query_parms *) query_parameters)->lower_bound; - K upper_key = ((wirs_query_parms *) query_parameters)->upper_bound; - - // Simulate a stack to unfold recursion. - double tot_weight = 0.0; - struct wirs_node* st[64] = {0}; - st[0] = m_root; - size_t top = 1; - while(top > 0) { - auto now = st[--top]; - if (covered_by(now, lower_key, upper_key) || - (now->left == nullptr && now->right == nullptr && intersects(now, lower_key, upper_key))) { - res->nodes.emplace_back(now); - tot_weight += now->weight; - } else { - if (now->left && intersects(now->left, lower_key, upper_key)) st[top++] = now->left; - if (now->right && intersects(now->right, lower_key, upper_key)) st[top++] = now->right; - } - } - - std::vector weights; - for (const auto& node: res->nodes) { - weights.emplace_back(node->weight / tot_weight); - } - res->tot_weight = tot_weight; - res->top_level_alias = new Alias(weights); - return res; + size_t get_memory_usage() { + return 0; } - static void delete_query_state(void *state) { - WIRSState *s = (WIRSState *) state; - delete s; - } - */ +private: size_t get_lower_bound(const K& key) const { size_t min = 0; @@ -271,43 +241,6 @@ public: return min; } - bool check_tombstone(const R& rec) { - if(!m_bf.lookup(rec.key)) { - return false; - } - - m_ts_check_cnt++; - size_t idx = get_lower_bound(rec.key); - if (idx >= m_reccnt) { - return false; - } - - auto ptr = m_data + get_lower_bound(rec.key); - - while (ptr < m_data + m_reccnt && *ptr < rec) { - ptr ++; - } - - bool result = *ptr == rec && ptr->is_tombstone(); - m_rejection_cnt += result; - - return result; - } - - size_t get_memory_usage() { - return 0; - } - - size_t get_rejection_count() { - return m_rejection_cnt; - } - - size_t get_ts_check_count() { - return m_ts_check_cnt; - } - -private: - bool covered_by(struct wirs_node* node, const K& lower_key, const K& upper_key) { auto low_index = node->low * m_group_size; auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1); @@ -394,13 +327,7 @@ private: size_t m_reccnt; size_t m_tombstone_cnt; size_t m_group_size; - size_t m_ts_check_cnt; - size_t m_deleted_cnt; BloomFilter m_bf; - - // The number of rejections caused by tombstones - // in this WIRS. - size_t m_rejection_cnt; }; } diff --git a/include/util/Cursor.h b/include/util/Cursor.h index 2609ae5..815458c 100644 --- a/include/util/Cursor.h +++ b/include/util/Cursor.h @@ -10,7 +10,7 @@ #pragma once #include "util/base.h" -#include "util/Record.h" +#include "framework/RecordInterface.h" #include "io/PagedFile.h" namespace de { diff --git a/include/util/Record.h b/include/util/Record.h deleted file mode 100644 index fc543ed..0000000 --- a/include/util/Record.h +++ /dev/null @@ -1,121 +0,0 @@ -/* - * include/util/record.h - * - * Copyright (C) 2023 Douglas Rumbaugh - * Dong Xie - * - * All rights reserved. Published under the Modified BSD License. - * - */ -#pragma once - -#include -#include - -#include "util/base.h" - -namespace de { - -template -concept RecordInterface = requires(R r, R s) { - r.key; - r.value; - - { r < s } ->std::convertible_to; - { r == s } ->std::convertible_to; -}; - -template -struct WrappedRecord { - R rec; - uint32_t header; - - inline void set_delete() { - header |= 2; - } - - inline bool is_deleted() const { - return header & 2; - } - - inline void set_tombstone(bool val=true) { - if (val) { - header |= val; - } else { - header &= 0; - } - } - - inline bool is_tombstone() const { - return header & 1; - } -}; - -template -concept WeightedRecordInterface = RecordInterface && requires(R r) { - {r.weight} -> std::convertible_to; -}; - -template -struct Record { - K key; - V value; - uint32_t header = 0; - - inline bool operator<(const Record& other) const { - return key < other.key || (key == other.key && value < other.value); - } - - inline bool operator==(const Record& other) const { - return key == other.key && value == other.value; - } -}; - -template -struct WeightedRecord { - K key; - V value; - W weight = 1; - uint32_t header = 0; - - inline void set_delete() { - header |= 2; - } - - inline bool is_deleted() const { - return header & 2; - } - - inline void set_tombstone(bool val=true) { - if (val) { - header |= val; - } else { - header &= 0; - } - } - - inline bool is_tombstone() const { - return header & 1; - } - - inline int match(const WeightedRecord* other) const { - return key == other->key && value == other->value; - } - - inline bool operator<(const WeightedRecord& other) const { - return key < other.key || (key == other.key && value < other.value); - } - - inline bool operator==(const WeightedRecord& other) const { - return key == other.key && value == other.value; - } -}; - - -template -static bool memtable_record_cmp(const R& a, const R& b) { - return (a.key < b.key) || (a.key == b.key && a.value < b.value) - || (a.key == b.key && a.value == b.value && a.header < b.header); -} - -} -- cgit v1.2.3