diff options
| author | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-12-22 13:00:19 -0500 |
|---|---|---|
| committer | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-12-22 13:00:19 -0500 |
| commit | 5617bed5257506d3dfda8537b16f44b3e40f1b42 (patch) | |
| tree | b1a4bb957929b20c884a4eed070f42065828fdb6 /include/framework/structure/InternalLevel.h | |
| parent | 9876d74e503df64eb9e82e540ca41fcf593ebf64 (diff) | |
| download | dynamic-extension-5617bed5257506d3dfda8537b16f44b3e40f1b42.tar.gz | |
Began overhauling reconstruction mechanism
Diffstat (limited to 'include/framework/structure/InternalLevel.h')
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 154 |
1 files changed, 26 insertions, 128 deletions
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index a4cf94d..8cfcd49 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -19,14 +19,10 @@ #include <vector> #include "framework/interface/Query.h" -#include "framework/interface/Record.h" #include "framework/interface/Shard.h" -#include "framework/structure/BufferView.h" #include "util/types.h" namespace de { -template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> -class InternalLevel; template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> class InternalLevel { @@ -34,110 +30,9 @@ class InternalLevel { typedef BufferView<RecordType> BuffView; public: - InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no), m_shard_cnt(0), m_shards(shard_cap, nullptr), - m_pending_shard(nullptr) {} - - ~InternalLevel() { delete m_pending_shard; } - - /* - * Create a new shard combining the records from base_level and new_level, - * and return a shared_ptr to a new level containing this shard. This is used - * for reconstructions under the leveling layout policy. - * - * No changes are made to the levels provided as arguments. - */ - static std::shared_ptr<InternalLevel> - reconstruction(InternalLevel *base_level, InternalLevel *new_level) { - assert(base_level->m_level_no > new_level->m_level_no || - (base_level->m_level_no == 0 && new_level->m_level_no == 0)); - auto res = new InternalLevel(base_level->m_level_no, 1); - res->m_shard_cnt = 1; - std::vector<ShardType *> shards = {base_level->m_shards[0].get(), - new_level->m_shards[0].get()}; - - res->m_shards[0] = std::make_shared<ShardType>(shards); - return std::shared_ptr<InternalLevel>(res); - } - - static std::shared_ptr<InternalLevel> - reconstruction(std::vector<InternalLevel *> levels, size_t level_idx) { - std::vector<ShardType *> shards; - for (auto level : levels) { - for (auto shard : level->m_shards) { - if (shard) - shards.emplace_back(shard.get()); - } - } - - auto res = new InternalLevel(level_idx, 1); - res->m_shard_cnt = 1; - res->m_shards[0] = std::make_shared<ShardType>(shards); + InternalLevel(ssize_t level_no) : m_level_no(level_no) {} - return std::shared_ptr<InternalLevel>(res); - } - - /* - * Create a new shard combining the records from all of - * the shards in level, and append this new shard into - * this level. This is used for reconstructions under - * the tiering layout policy. - * - * No changes are made to the level provided as an argument. - */ - void append_level(InternalLevel *level) { - // FIXME: that this is happening probably means that - // something is going terribly wrong earlier in the - // reconstruction logic. - if (level->get_shard_count() == 0) { - return; - } - - std::vector<ShardType *> shards; - for (auto shard : level->m_shards) { - if (shard) - shards.emplace_back(shard.get()); - } - - if (m_shard_cnt == m_shards.size()) { - m_pending_shard = new ShardType(shards); - return; - } - - auto tmp = new ShardType(shards); - m_shards[m_shard_cnt] = std::shared_ptr<ShardType>(tmp); - - ++m_shard_cnt; - } - - /* - * Create a new shard using the records in the - * provided buffer, and append this new shard - * into this level. This is used for buffer - * flushes under the tiering layout policy. - */ - void append_buffer(BuffView buffer) { - if (m_shard_cnt == m_shards.size()) { - assert(m_pending_shard == nullptr); - m_pending_shard = new ShardType(std::move(buffer)); - return; - } - - m_shards[m_shard_cnt] = std::make_shared<ShardType>(std::move(buffer)); - ++m_shard_cnt; - } - - void finalize() { - if (m_pending_shard) { - for (size_t i = 0; i < m_shards.size(); i++) { - m_shards[i] = nullptr; - } - - m_shards[0] = std::shared_ptr<ShardType>(m_pending_shard); - m_pending_shard = nullptr; - m_shard_cnt = 1; - } - } + ~InternalLevel() = default; /* * Create a new shard containing the combined records @@ -146,7 +41,7 @@ public: * No changes are made to this level. */ ShardType *get_combined_shard() { - if (m_shard_cnt == 0) { + if (m_shards.size() == 0) { return nullptr; } @@ -163,7 +58,7 @@ public: std::vector<std::pair<ShardID, ShardType *>> &shards, std::vector<typename QueryType::LocalQuery *> &local_queries, typename QueryType::Parameters *query_parms) { - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { auto local_query = QueryType::local_preproc(m_shards[i].get(), query_parms); @@ -174,10 +69,10 @@ public: } bool check_tombstone(size_t shard_stop, const RecordType &rec) { - if (m_shard_cnt == 0) + if (m_shards.size() == 0) return false; - for (int i = m_shard_cnt - 1; i >= (ssize_t)shard_stop; i--) { + for (int i = m_shards.size() - 1; i >= (ssize_t)shard_stop; i--) { if (m_shards[i]) { auto res = m_shards[i]->point_lookup(rec, true); if (res && res->is_tombstone()) { @@ -189,7 +84,7 @@ public: } bool delete_record(const RecordType &rec) { - if (m_shard_cnt == 0) + if (m_shards.size() == 0) return false; for (size_t i = 0; i < m_shards.size(); ++i) { @@ -206,18 +101,18 @@ public: } ShardType *get_shard(size_t idx) { - if (idx >= m_shard_cnt) { + if (idx >= m_shards.size()) { return nullptr; } return m_shards[idx].get(); } - size_t get_shard_count() { return m_shard_cnt; } + size_t get_shard_count() { return m_shards.size(); } size_t get_record_count() { size_t cnt = 0; - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { cnt += m_shards[i]->get_record_count(); } @@ -228,7 +123,7 @@ public: size_t get_tombstone_count() { size_t res = 0; - for (size_t i = 0; i < m_shard_cnt; ++i) { + for (size_t i = 0; i < m_shards.size(); ++i) { if (m_shards[i]) { res += m_shards[i]->get_tombstone_count(); } @@ -238,7 +133,7 @@ public: size_t get_aux_memory_usage() { size_t cnt = 0; - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { cnt += m_shards[i]->get_aux_memory_usage(); } @@ -249,7 +144,7 @@ public: size_t get_memory_usage() { size_t cnt = 0; - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { cnt += m_shards[i]->get_memory_usage(); } @@ -261,7 +156,7 @@ public: double get_tombstone_prop() { size_t tscnt = 0; size_t reccnt = 0; - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { tscnt += m_shards[i]->get_tombstone_count(); reccnt += m_shards[i]->get_record_count(); @@ -272,24 +167,27 @@ public: } std::shared_ptr<InternalLevel> clone() { - auto new_level = - std::make_shared<InternalLevel>(m_level_no, m_shards.size()); - for (size_t i = 0; i < m_shard_cnt; i++) { + auto new_level = std::make_shared<InternalLevel>(m_level_no); + for (size_t i = 0; i < m_shards.size(); i++) { new_level->m_shards[i] = m_shards[i]; } - new_level->m_shard_cnt = m_shard_cnt; return new_level; } -private: - ssize_t m_level_no; + void truncate() { m_shards.erase(m_shards.begin(), m_shards.end()); } - size_t m_shard_cnt; - size_t m_shard_size_cap; + void delete_shard(shard_index shard) { + m_shards.erase(m_shards.begin() + shard); + } + + bool append(std::shared_ptr<ShardType> shard) { + m_shards.emplace_back(shard); + } +private: + ssize_t m_level_no; std::vector<std::shared_ptr<ShardType>> m_shards; - ShardType *m_pending_shard; }; } // namespace de |