/* * include/framework/structure/InternalLevel.h * * Copyright (C) 2023-2024 Douglas B. Rumbaugh * Dong Xie * * Distributed under the Modified BSD License. * * The word `Internal` in this class's name refers to memory. The current * model, inherited from the framework in Practical Dynamic Extension for * Sampling Indexes, would use a different ExternalLevel for shards stored * on external storage. This is a distinction that can probably be avoided * with some more thought being put into interface design. * */ #pragma once #include #include #include #include #include #include "framework/interface/Query.h" #include "framework/interface/Shard.h" #include "util/types.h" namespace de { template QueryType> class InternalLevel { typedef typename ShardType::RECORD RecordType; typedef BufferView BuffView; typedef std::pair, size_t> shard_ptr; public: InternalLevel(ssize_t level_no) : m_level_no(level_no) {} ~InternalLevel() = default; /* * Create a new shard containing the combined records * from all shards on this level and return it. * * No changes are made to this level. */ ShardType *get_combined_shard() const { if (m_shards.size() == 0) { return nullptr; } std::vector shards; for (auto shard : m_shards) { if (shard.first) shards.emplace_back(shard.first.get()); } return new ShardType(shards); } void get_local_queries( std::vector> &shards, std::vector &local_queries, typename QueryType::Parameters *query_parms) const { for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i].first) { auto local_query = QueryType::local_preproc(m_shards[i].first.get(), query_parms); shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].first.get()}); local_queries.emplace_back(local_query); } } } bool check_tombstone(size_t shard_stop, const RecordType &rec) const { if (m_shards.size() == 0) return false; for (int i = m_shards.size() - 1; i >= (ssize_t)shard_stop; i--) { if (m_shards[i]) { auto res = m_shards[i].first->point_lookup(rec, true); if (res && res->is_tombstone()) { return true; } } } return false; } bool delete_record(const RecordType &rec) { if (m_shards.size() == 0) return false; for (size_t i = 0; i < m_shards.size(); ++i) { if (m_shards[i].first) { auto res = m_shards[i].first->point_lookup(rec); if (res) { res->set_delete(); return true; } } } return false; } const ShardType *get_shard(ssize_t idx) const { if (idx == all_shards_idx) { idx = 0; } if (idx >= (ssize_t)m_shards.size()) { return nullptr; } return m_shards[idx].first.get(); } const size_t get_shard_version(size_t idx) const { if (idx >= m_shards.size()) { return 0; } return m_shards[idx].second; } size_t get_shard_count() const { return m_shards.size(); } size_t get_record_count() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i].first) { cnt += m_shards[i].first->get_record_count(); } } return cnt; } size_t get_tombstone_count() const { size_t res = 0; for (size_t i = 0; i < m_shards.size(); ++i) { if (m_shards[i].first) { res += m_shards[i].first->get_tombstone_count(); } } return res; } size_t get_aux_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i].first) { cnt += m_shards[i].first->get_aux_memory_usage(); } } return cnt; } size_t get_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { cnt += m_shards[i].first->get_memory_usage(); } } return cnt; } double get_tombstone_prop() const { size_t tscnt = 0; size_t reccnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { tscnt += m_shards[i].first->get_tombstone_count(); reccnt += m_shards[i].first->get_record_count(); } } return (double)tscnt / (double)(tscnt + reccnt); } size_t get_nonempty_shard_count() const { size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i].first && m_shards[i].first->get_record_count() > 0) { cnt += 1; } } return cnt; } std::shared_ptr clone() const { auto new_level = std::make_shared(m_level_no); for (size_t i = 0; i < m_shards.size(); i++) { new_level->append(m_shards[i].first, m_shards[i].second); } for (auto itr = m_rt_window.begin(); itr < m_rt_window.end(); itr++) { new_level->m_rt_window.push_front(*itr); } return new_level; } void truncate() { m_shards.erase(m_shards.begin(), m_shards.end()); } void delete_shard(shard_index shard, bool log_delete = true) { size_t before = m_shards.size(); m_shards.erase(m_shards.begin() + shard); size_t after = m_shards.size(); assert(before > after); } void append(std::shared_ptr shard, size_t version = 0) { m_shards.push_back({shard, version}); } void append(shard_ptr shard) { m_shards.push_back(shard); } const shard_ptr get_shard_ptr(ssize_t idx) const { if (idx >= 0 && idx < (ssize_t)m_shards.size()) { return m_shards[idx]; } else if (idx == all_shards_idx && m_shards.size() == 1) { return m_shards[0]; } return {nullptr, 0}; } void set_shard_version(size_t idx, size_t version) { if (idx < m_shards.size()) { m_shards[idx].second = version; } } long predict_reconstruction_time(size_t reccnt) { if (m_rt_window.size() == 0) { return 0; } size_t total = 0; for (auto rt : m_rt_window) { total += rt; } return total / m_rt_window.size(); } void update_reconstruction_model(reconstruction_results &recon) { if (m_rt_window.size() >= m_window_size) { m_rt_window.pop_back(); } m_rt_window.push_front(recon.runtime); } private: ssize_t m_level_no; std::vector m_shards; const size_t m_window_size = 15; std::deque m_rt_window; }; } // namespace de