diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-22 14:58:22 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-22 14:58:22 -0400 |
| commit | 0cf160ee68d37be93665e665ef22ae6e211a157d (patch) | |
| tree | badaca4c5654e7abbe9291b18b08748aeeadc518 /include | |
| parent | 08d6c84b9d69b500c964a8ff66e726e1f01f2095 (diff) | |
| download | dynamic-extension-0cf160ee68d37be93665e665ef22ae6e211a157d.tar.gz | |
More updates/restructuring
Diffstat (limited to 'include')
| -rw-r--r-- | include/ds/PriorityQueue.h | 2 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 46 | ||||
| -rw-r--r-- | include/framework/InternalLevel.h | 39 | ||||
| -rw-r--r-- | include/framework/MutableBuffer.h | 2 | ||||
| -rw-r--r-- | include/framework/RecordInterface.h (renamed from include/util/Record.h) | 2 | ||||
| -rw-r--r-- | include/framework/ShardInterface.h | 12 | ||||
| -rw-r--r-- | include/shard/MemISAM.h | 22 | ||||
| -rw-r--r-- | include/shard/WIRS.h | 113 | ||||
| -rw-r--r-- | include/util/Cursor.h | 2 |
9 files changed, 104 insertions, 136 deletions
diff --git a/include/ds/PriorityQueue.h b/include/ds/PriorityQueue.h index 22582da..0468fa4 100644 --- a/include/ds/PriorityQueue.h +++ b/include/ds/PriorityQueue.h @@ -12,7 +12,7 @@ #include <vector> #include <cassert> -#include "util/Record.h" +#include "framework/RecordInterface.h" namespace de { diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 2009344..a70dda4 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -18,6 +18,7 @@ #include "framework/InternalLevel.h" #include "framework/ShardInterface.h" #include "framework/QueryInterface.h" +#include "framework/RecordInterface.h" #include "shard/WIRS.h" #include "ds/Alias.h" @@ -74,12 +75,15 @@ typedef ssize_t level_index; template <RecordInterface R, ShardInterface S, QueryInterface Q, typename FQ=void> class DynamicExtension { + //typedef typename S<WrappedRecord<R>> Shard; + typedef S Shard; + typedef MutableBuffer<R> Buffer; public: DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop), - m_buffer(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_cap * max_delete_prop)) - {} + m_buffer(new Buffer(buffer_cap, LSM_REJ_SAMPLE, buffer_cap * max_delete_prop)) + { } ~DynamicExtension() { delete m_buffer; @@ -94,7 +98,7 @@ public: } int erase(const R &rec) { - MutableBuffer<R> *buffer; + Buffer *buffer; if constexpr (DELETE_TAGGING) { auto buffer = get_buffer(); @@ -131,7 +135,7 @@ public: auto buffer_state = Q::get_buffer_query_state(buffer, parms); // Get the shard query states - std::vector<std::pair<ShardID, S*>> shards; + std::vector<std::pair<ShardID, Shard*>> shards; std::vector<void*> states; for (auto &level : m_levels) { @@ -218,8 +222,8 @@ public: return m_buffer->get_capacity(); } - S *create_ssi() { - std::vector<S *> shards; + Shard *create_ssi() { + std::vector<Shard *> shards; if (m_levels.size() > 0) { for (int i=m_levels.size() - 1; i>= 0; i--) { @@ -231,7 +235,7 @@ public: shards.emplace_back(new S(get_buffer(), nullptr)); - S *shards_array[shards.size()]; + Shard *shards_array[shards.size()]; size_t j = 0; for (size_t i=0; i<shards.size(); i++) { @@ -240,7 +244,7 @@ public: } } - S *flattened = new S(shards_array, j, nullptr); + Shard *flattened = new S(shards_array, j, nullptr); for (auto shard : shards) { delete shard; @@ -250,19 +254,19 @@ public: } private: - MutableBuffer<R> *m_buffer; + Buffer *m_buffer; size_t m_scale_factor; double m_max_delete_prop; std::vector<InternalLevel<R, S, Q> *> m_levels; - MutableBuffer<R> *get_buffer() { + Buffer *get_buffer() { return m_buffer; } int internal_append(R &rec, bool ts) { - MutableBuffer<R> *buffer; + Buffer *buffer; while (!(buffer = get_buffer())) ; @@ -273,7 +277,7 @@ private: return buffer->append(rec, ts); } - std::vector<R> post_process(std::vector<R> records, ShardID shid, MutableBuffer<R> *buffer) { + std::vector<R> post_process(std::vector<R> records, ShardID shid, Buffer *buffer) { std::vector<R> processed_records; processed_records.reserve(records.size()); @@ -331,7 +335,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel<R, S, Q>(new_idx, new_shard_cnt)); + m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)); return new_idx; } @@ -380,7 +384,7 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first merge. */ - inline level_index find_mergable_level(level_index idx, MutableBuffer<R> *buffer=nullptr) { + inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { if (idx == 0 && m_levels.size() == 0) return -1; @@ -410,25 +414,25 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<R, S, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); + m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); mark_as_unused(tmp); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel<R, S, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); + m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); } - inline void merge_buffer_into_l0(MutableBuffer<R> *buffer) { + inline void merge_buffer_into_l0(Buffer *buffer) { assert(m_levels[0]); if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel<R, S, Q>(0, 1); + auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); temp_level->append_buffer(buffer); - auto new_level = InternalLevel<R, S, Q>::merge_levels(old_level, temp_level); + auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level); m_levels[0] = new_level; delete temp_level; @@ -444,7 +448,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel<R, S, Q> *level) { + inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) { delete level; } @@ -477,7 +481,7 @@ private: * a pointer to the memory table to use, if desired. Otherwise, there are * no guarantees about which buffer will be accessed if level_index is -1. */ - inline size_t get_level_record_count(level_index idx, MutableBuffer<R> *buffer=nullptr) { + inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) { assert(idx >= -1); if (idx == -1) { diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index 70da821..6986a61 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -15,18 +15,20 @@ #include "util/types.h" #include "util/bf_config.h" #include "framework/ShardInterface.h" -#include "framework/MutableBuffer.h" #include "framework/QueryInterface.h" +#include "framework/RecordInterface.h" +#include "framework/MutableBuffer.h" #include "ds/BloomFilter.h" namespace de { template <RecordInterface R, ShardInterface S, QueryInterface Q> class InternalLevel { - + typedef S Shard; + typedef MutableBuffer<R> Buffer; public: InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<S>(shard_cap, nullptr)) + : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<Shard>(shard_cap, nullptr)) {} // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 @@ -45,7 +47,7 @@ public: assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); auto res = new InternalLevel(base_level->m_level_no, 1); res->m_shard_cnt = 1; - S* shards[2]; + Shard* shards[2]; shards[0] = base_level->m_shards[0]; shards[1] = new_level->m_shards[0]; @@ -53,7 +55,7 @@ public: return res; } - void append_buffer(MutableBuffer<R>* buffer) { + void append_buffer(Buffer* buffer) { assert(m_shard_cnt < m_shards.size()); m_shards[m_shard_cnt] = new S(buffer); ++m_shard_cnt; @@ -65,8 +67,8 @@ public: ++m_shard_cnt; } - S *get_merged_shard() { - S *shards[m_shard_cnt]; + Shard *get_merged_shard() { + Shard *shards[m_shard_cnt]; for (size_t i=0; i<m_shard_cnt; i++) { shards[i] = m_shards[i]; @@ -76,7 +78,7 @@ public: } // Append the sample range in-order..... - void get_query_states(std::vector<std::pair<ShardID, S *>> &shards, std::vector<void*>& shard_states, void *query_parms) { + void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) { for (size_t i=0; i<m_shard_cnt; i++) { if (m_shards[i]) { auto shard_state = Q::get_query_state(m_shards[i], query_parms); @@ -90,23 +92,32 @@ public: if (m_shard_cnt == 0) return false; for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) { - if (m_shards[i] && m_shards[i]->check_tombstone(rec)) - return true; + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec, true); + if (res && res->is_tombstone()) { + return true; + } + } } return false; } bool delete_record(const R &rec) { + if (m_shard_cnt == 0) return false; + for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i] && m_shards[i]->delete_record(rec)) { - return true; + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec); + if (res) { + res->set_delete(); + } } } return false; } - S* get_shard(size_t idx) { + Shard* get_shard(size_t idx) { return m_shards[idx]; } @@ -170,7 +181,7 @@ private: size_t m_shard_cnt; size_t m_shard_size_cap; - std::shared_ptr<std::vector<S*>> m_shards; + std::shared_ptr<std::vector<Shard*>> m_shards; }; } diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index 4b45f20..3643a89 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -19,9 +19,9 @@ #include "util/base.h" #include "util/bf_config.h" #include "ds/BloomFilter.h" -#include "util/Record.h" #include "ds/Alias.h" #include "util/timer.h" +#include "framework/RecordInterface.h" namespace de { diff --git a/include/util/Record.h b/include/framework/RecordInterface.h index fc543ed..8afd90a 100644 --- a/include/util/Record.h +++ b/include/framework/RecordInterface.h @@ -1,5 +1,5 @@ /* - * include/util/record.h + * include/framework/RecordInterface.h * * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> * Dong Xie <dongx@psu.edu> diff --git a/include/framework/ShardInterface.h b/include/framework/ShardInterface.h index 1f48a45..3aa62df 100644 --- a/include/framework/ShardInterface.h +++ b/include/framework/ShardInterface.h @@ -11,12 +11,16 @@ #include <concepts> #include "util/types.h" +#include "framework/RecordInterface.h" -template <typename S> -concept ShardInterface = requires(S s, void *p) { - s.point_lookup(); +namespace de { +//template <template<typename> typename S, typename R> +template <typename S> +concept ShardInterface = requires(S s, void *p, bool b) { + //{s.point_lookup(r, b) } -> std::same_as<R*>; {s.get_record_count()} -> std::convertible_to<size_t>; - {s.get_tombstone_count()} -> std::convertible_to<size_t>; {s.get_memory_usage()} -> std::convertible_to<size_t>; }; + +} diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h index 55699be..8ac17e4 100644 --- a/include/shard/MemISAM.h +++ b/include/shard/MemISAM.h @@ -27,6 +27,8 @@ thread_local size_t mrun_cancelations = 0; template <RecordInterface R> class MemISAM { private: + friend class InternalLevel; + typedef decltype(R::key) K; typedef decltype(R::value) V; @@ -178,6 +180,26 @@ public: return m_tombstone_cnt; } + R *point_lookup(R &rec, bool filter) { + + if (filter && !m_bf->lookup(rec.key)) { + return nullptr; + } + + size_t idx = get_lower_bound(rec.key); + if (idx >= m_reccnt) { + return false; + } + + while (idx < m_reccnt && m_data[idx] < rec) ++idx; + + if (m_data[idx] == rec) { + return m_data + idx; + } + + return nullptr; + } + bool delete_record(const K& key, const V& val) { size_t idx = get_lower_bound(key); if (idx >= m_reccnt) { diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h index 42dbcfd..7dee496 100644 --- a/include/shard/WIRS.h +++ b/include/shard/WIRS.h @@ -1,4 +1,5 @@ /* + {s.get_tombstone_count()} -> std::convertible_to<size_t>; * include/shard/WIRS.h * * Copyright (C) 2023 Dong Xie <dongx@psu.edu> @@ -8,6 +9,7 @@ */ #pragma once + #include <vector> #include <cassert> #include <queue> @@ -19,8 +21,8 @@ #include "ds/Alias.h" #include "ds/BloomFilter.h" #include "util/bf_config.h" -#include "util/Record.h" #include "framework/MutableBuffer.h" +#include "framework/RecordInterface.h" namespace de { @@ -32,8 +34,11 @@ struct wirs_query_parms { decltype(R::key) upper_bound; }; +class InternalLevel; + template <WeightedRecordInterface R> class WIRS { + friend class InternalLevel; private: typedef decltype(R::key) K; @@ -62,8 +67,7 @@ private: public: WIRS(MutableBuffer<R>* buffer) - : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), - m_ts_check_cnt(0), m_root(nullptr) { + : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) { size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); @@ -106,8 +110,7 @@ public: } WIRS(WIRS** shards, size_t len) - : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0), - m_root(nullptr) { + : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) { std::vector<Cursor<R>> cursors; cursors.reserve(len); @@ -177,24 +180,25 @@ public: free_tree(m_root); } - bool delete_record(const R& rec) { + R *point_lookup(R &rec, bool filter=false) { + if (filter && !m_bf.lookup(rec.key)) { + return nullptr; + } + size_t idx = get_lower_bound(rec.key); if (idx >= m_reccnt) { - return false; + return nullptr; } while (idx < m_reccnt && m_data[idx] < rec) ++idx; - if (m_data[idx] == R {rec.key, rec.val} && !m_data[idx].is_tombstone()) { - m_data[idx].set_delete(); - m_deleted_cnt++; - return true; + if (m_data[idx] == rec) { + return m_data + idx; } - return false; + return nullptr; } - R* sorted_output() const { return m_data; } @@ -212,46 +216,12 @@ public: return m_data + idx; } - /* - // low - high -> decompose to a set of nodes. - // Build Alias across the decomposed nodes. - WIRSState<R>* get_query_state(void *query_parameters) { - auto res = new WIRSState(); - K lower_key = ((wirs_query_parms<R> *) query_parameters)->lower_bound; - K upper_key = ((wirs_query_parms<R> *) query_parameters)->upper_bound; - - // Simulate a stack to unfold recursion. - double tot_weight = 0.0; - struct wirs_node<R>* st[64] = {0}; - st[0] = m_root; - size_t top = 1; - while(top > 0) { - auto now = st[--top]; - if (covered_by(now, lower_key, upper_key) || - (now->left == nullptr && now->right == nullptr && intersects(now, lower_key, upper_key))) { - res->nodes.emplace_back(now); - tot_weight += now->weight; - } else { - if (now->left && intersects(now->left, lower_key, upper_key)) st[top++] = now->left; - if (now->right && intersects(now->right, lower_key, upper_key)) st[top++] = now->right; - } - } - - std::vector<double> weights; - for (const auto& node: res->nodes) { - weights.emplace_back(node->weight / tot_weight); - } - res->tot_weight = tot_weight; - res->top_level_alias = new Alias(weights); - return res; + size_t get_memory_usage() { + return 0; } - static void delete_query_state(void *state) { - WIRSState<R> *s = (WIRSState<R> *) state; - delete s; - } - */ +private: size_t get_lower_bound(const K& key) const { size_t min = 0; @@ -271,43 +241,6 @@ public: return min; } - bool check_tombstone(const R& rec) { - if(!m_bf.lookup(rec.key)) { - return false; - } - - m_ts_check_cnt++; - size_t idx = get_lower_bound(rec.key); - if (idx >= m_reccnt) { - return false; - } - - auto ptr = m_data + get_lower_bound(rec.key); - - while (ptr < m_data + m_reccnt && *ptr < rec) { - ptr ++; - } - - bool result = *ptr == rec && ptr->is_tombstone(); - m_rejection_cnt += result; - - return result; - } - - size_t get_memory_usage() { - return 0; - } - - size_t get_rejection_count() { - return m_rejection_cnt; - } - - size_t get_ts_check_count() { - return m_ts_check_cnt; - } - -private: - bool covered_by(struct wirs_node<R>* node, const K& lower_key, const K& upper_key) { auto low_index = node->low * m_group_size; auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1); @@ -394,13 +327,7 @@ private: size_t m_reccnt; size_t m_tombstone_cnt; size_t m_group_size; - size_t m_ts_check_cnt; - size_t m_deleted_cnt; BloomFilter<K> m_bf; - - // The number of rejections caused by tombstones - // in this WIRS. - size_t m_rejection_cnt; }; } diff --git a/include/util/Cursor.h b/include/util/Cursor.h index 2609ae5..815458c 100644 --- a/include/util/Cursor.h +++ b/include/util/Cursor.h @@ -10,7 +10,7 @@ #pragma once #include "util/base.h" -#include "util/Record.h" +#include "framework/RecordInterface.h" #include "io/PagedFile.h" namespace de { |