diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-22 14:58:22 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-22 14:58:22 -0400 |
| commit | 0cf160ee68d37be93665e665ef22ae6e211a157d (patch) | |
| tree | badaca4c5654e7abbe9291b18b08748aeeadc518 /include/framework | |
| parent | 08d6c84b9d69b500c964a8ff66e726e1f01f2095 (diff) | |
| download | dynamic-extension-0cf160ee68d37be93665e665ef22ae6e211a157d.tar.gz | |
More updates/restructuring
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 46 | ||||
| -rw-r--r-- | include/framework/InternalLevel.h | 39 | ||||
| -rw-r--r-- | include/framework/MutableBuffer.h | 2 | ||||
| -rw-r--r-- | include/framework/RecordInterface.h | 121 | ||||
| -rw-r--r-- | include/framework/ShardInterface.h | 12 |
5 files changed, 180 insertions, 40 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 2009344..a70dda4 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -18,6 +18,7 @@ #include "framework/InternalLevel.h" #include "framework/ShardInterface.h" #include "framework/QueryInterface.h" +#include "framework/RecordInterface.h" #include "shard/WIRS.h" #include "ds/Alias.h" @@ -74,12 +75,15 @@ typedef ssize_t level_index; template <RecordInterface R, ShardInterface S, QueryInterface Q, typename FQ=void> class DynamicExtension { + //typedef typename S<WrappedRecord<R>> Shard; + typedef S Shard; + typedef MutableBuffer<R> Buffer; public: DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop), - m_buffer(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_cap * max_delete_prop)) - {} + m_buffer(new Buffer(buffer_cap, LSM_REJ_SAMPLE, buffer_cap * max_delete_prop)) + { } ~DynamicExtension() { delete m_buffer; @@ -94,7 +98,7 @@ public: } int erase(const R &rec) { - MutableBuffer<R> *buffer; + Buffer *buffer; if constexpr (DELETE_TAGGING) { auto buffer = get_buffer(); @@ -131,7 +135,7 @@ public: auto buffer_state = Q::get_buffer_query_state(buffer, parms); // Get the shard query states - std::vector<std::pair<ShardID, S*>> shards; + std::vector<std::pair<ShardID, Shard*>> shards; std::vector<void*> states; for (auto &level : m_levels) { @@ -218,8 +222,8 @@ public: return m_buffer->get_capacity(); } - S *create_ssi() { - std::vector<S *> shards; + Shard *create_ssi() { + std::vector<Shard *> shards; if (m_levels.size() > 0) { for (int i=m_levels.size() - 1; i>= 0; i--) { @@ -231,7 +235,7 @@ public: shards.emplace_back(new S(get_buffer(), nullptr)); - S *shards_array[shards.size()]; + Shard *shards_array[shards.size()]; size_t j = 0; for (size_t i=0; i<shards.size(); i++) { @@ -240,7 +244,7 @@ public: } } - S *flattened = new S(shards_array, j, nullptr); + Shard *flattened = new S(shards_array, j, nullptr); for (auto shard : shards) { delete shard; @@ -250,19 +254,19 @@ public: } private: - MutableBuffer<R> *m_buffer; + Buffer *m_buffer; size_t m_scale_factor; double m_max_delete_prop; std::vector<InternalLevel<R, S, Q> *> m_levels; - MutableBuffer<R> *get_buffer() { + Buffer *get_buffer() { return m_buffer; } int internal_append(R &rec, bool ts) { - MutableBuffer<R> *buffer; + Buffer *buffer; while (!(buffer = get_buffer())) ; @@ -273,7 +277,7 @@ private: return buffer->append(rec, ts); } - std::vector<R> post_process(std::vector<R> records, ShardID shid, MutableBuffer<R> *buffer) { + std::vector<R> post_process(std::vector<R> records, ShardID shid, Buffer *buffer) { std::vector<R> processed_records; processed_records.reserve(records.size()); @@ -331,7 +335,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel<R, S, Q>(new_idx, new_shard_cnt)); + m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)); return new_idx; } @@ -380,7 +384,7 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first merge. */ - inline level_index find_mergable_level(level_index idx, MutableBuffer<R> *buffer=nullptr) { + inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { if (idx == 0 && m_levels.size() == 0) return -1; @@ -410,25 +414,25 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<R, S, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); + m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); mark_as_unused(tmp); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel<R, S, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); + m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); } - inline void merge_buffer_into_l0(MutableBuffer<R> *buffer) { + inline void merge_buffer_into_l0(Buffer *buffer) { assert(m_levels[0]); if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel<R, S, Q>(0, 1); + auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); temp_level->append_buffer(buffer); - auto new_level = InternalLevel<R, S, Q>::merge_levels(old_level, temp_level); + auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level); m_levels[0] = new_level; delete temp_level; @@ -444,7 +448,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel<R, S, Q> *level) { + inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) { delete level; } @@ -477,7 +481,7 @@ private: * a pointer to the memory table to use, if desired. Otherwise, there are * no guarantees about which buffer will be accessed if level_index is -1. */ - inline size_t get_level_record_count(level_index idx, MutableBuffer<R> *buffer=nullptr) { + inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) { assert(idx >= -1); if (idx == -1) { diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index 70da821..6986a61 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -15,18 +15,20 @@ #include "util/types.h" #include "util/bf_config.h" #include "framework/ShardInterface.h" -#include "framework/MutableBuffer.h" #include "framework/QueryInterface.h" +#include "framework/RecordInterface.h" +#include "framework/MutableBuffer.h" #include "ds/BloomFilter.h" namespace de { template <RecordInterface R, ShardInterface S, QueryInterface Q> class InternalLevel { - + typedef S Shard; + typedef MutableBuffer<R> Buffer; public: InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<S>(shard_cap, nullptr)) + : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<Shard>(shard_cap, nullptr)) {} // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 @@ -45,7 +47,7 @@ public: assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); auto res = new InternalLevel(base_level->m_level_no, 1); res->m_shard_cnt = 1; - S* shards[2]; + Shard* shards[2]; shards[0] = base_level->m_shards[0]; shards[1] = new_level->m_shards[0]; @@ -53,7 +55,7 @@ public: return res; } - void append_buffer(MutableBuffer<R>* buffer) { + void append_buffer(Buffer* buffer) { assert(m_shard_cnt < m_shards.size()); m_shards[m_shard_cnt] = new S(buffer); ++m_shard_cnt; @@ -65,8 +67,8 @@ public: ++m_shard_cnt; } - S *get_merged_shard() { - S *shards[m_shard_cnt]; + Shard *get_merged_shard() { + Shard *shards[m_shard_cnt]; for (size_t i=0; i<m_shard_cnt; i++) { shards[i] = m_shards[i]; @@ -76,7 +78,7 @@ public: } // Append the sample range in-order..... - void get_query_states(std::vector<std::pair<ShardID, S *>> &shards, std::vector<void*>& shard_states, void *query_parms) { + void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) { for (size_t i=0; i<m_shard_cnt; i++) { if (m_shards[i]) { auto shard_state = Q::get_query_state(m_shards[i], query_parms); @@ -90,23 +92,32 @@ public: if (m_shard_cnt == 0) return false; for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) { - if (m_shards[i] && m_shards[i]->check_tombstone(rec)) - return true; + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec, true); + if (res && res->is_tombstone()) { + return true; + } + } } return false; } bool delete_record(const R &rec) { + if (m_shard_cnt == 0) return false; + for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i] && m_shards[i]->delete_record(rec)) { - return true; + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec); + if (res) { + res->set_delete(); + } } } return false; } - S* get_shard(size_t idx) { + Shard* get_shard(size_t idx) { return m_shards[idx]; } @@ -170,7 +181,7 @@ private: size_t m_shard_cnt; size_t m_shard_size_cap; - std::shared_ptr<std::vector<S*>> m_shards; + std::shared_ptr<std::vector<Shard*>> m_shards; }; } diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index 4b45f20..3643a89 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -19,9 +19,9 @@ #include "util/base.h" #include "util/bf_config.h" #include "ds/BloomFilter.h" -#include "util/Record.h" #include "ds/Alias.h" #include "util/timer.h" +#include "framework/RecordInterface.h" namespace de { diff --git a/include/framework/RecordInterface.h b/include/framework/RecordInterface.h new file mode 100644 index 0000000..8afd90a --- /dev/null +++ b/include/framework/RecordInterface.h @@ -0,0 +1,121 @@ +/* + * include/framework/RecordInterface.h + * + * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include <cstring> +#include <concepts> + +#include "util/base.h" + +namespace de { + +template<typename R> +concept RecordInterface = requires(R r, R s) { + r.key; + r.value; + + { r < s } ->std::convertible_to<bool>; + { r == s } ->std::convertible_to<bool>; +}; + +template<RecordInterface R> +struct WrappedRecord { + R rec; + uint32_t header; + + inline void set_delete() { + header |= 2; + } + + inline bool is_deleted() const { + return header & 2; + } + + inline void set_tombstone(bool val=true) { + if (val) { + header |= val; + } else { + header &= 0; + } + } + + inline bool is_tombstone() const { + return header & 1; + } +}; + +template <typename R> +concept WeightedRecordInterface = RecordInterface<R> && requires(R r) { + {r.weight} -> std::convertible_to<double>; +}; + +template <typename K, typename V> +struct Record { + K key; + V value; + uint32_t header = 0; + + inline bool operator<(const Record& other) const { + return key < other.key || (key == other.key && value < other.value); + } + + inline bool operator==(const Record& other) const { + return key == other.key && value == other.value; + } +}; + +template <typename K, typename V, typename W> +struct WeightedRecord { + K key; + V value; + W weight = 1; + uint32_t header = 0; + + inline void set_delete() { + header |= 2; + } + + inline bool is_deleted() const { + return header & 2; + } + + inline void set_tombstone(bool val=true) { + if (val) { + header |= val; + } else { + header &= 0; + } + } + + inline bool is_tombstone() const { + return header & 1; + } + + inline int match(const WeightedRecord* other) const { + return key == other->key && value == other->value; + } + + inline bool operator<(const WeightedRecord& other) const { + return key < other.key || (key == other.key && value < other.value); + } + + inline bool operator==(const WeightedRecord& other) const { + return key == other.key && value == other.value; + } +}; + + +template <RecordInterface R> +static bool memtable_record_cmp(const R& a, const R& b) { + return (a.key < b.key) || (a.key == b.key && a.value < b.value) + || (a.key == b.key && a.value == b.value && a.header < b.header); +} + +} diff --git a/include/framework/ShardInterface.h b/include/framework/ShardInterface.h index 1f48a45..3aa62df 100644 --- a/include/framework/ShardInterface.h +++ b/include/framework/ShardInterface.h @@ -11,12 +11,16 @@ #include <concepts> #include "util/types.h" +#include "framework/RecordInterface.h" -template <typename S> -concept ShardInterface = requires(S s, void *p) { - s.point_lookup(); +namespace de { +//template <template<typename> typename S, typename R> +template <typename S> +concept ShardInterface = requires(S s, void *p, bool b) { + //{s.point_lookup(r, b) } -> std::same_as<R*>; {s.get_record_count()} -> std::convertible_to<size_t>; - {s.get_tombstone_count()} -> std::convertible_to<size_t>; {s.get_memory_usage()} -> std::convertible_to<size_t>; }; + +} |