summaryrefslogtreecommitdiffstats
path: root/include/framework/structure/InternalLevel.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/structure/InternalLevel.h')
-rw-r--r--include/framework/structure/InternalLevel.h271
1 files changed, 271 insertions, 0 deletions
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
new file mode 100644
index 0000000..db38946
--- /dev/null
+++ b/include/framework/structure/InternalLevel.h
@@ -0,0 +1,271 @@
+/*
+ * include/framework/structure/InternalLevel.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * The word `Internal` in this class's name refers to memory. The current
+ * model, inherited from the framework in Practical Dynamic Extension for
+ * Sampling Indexes, would use a different ExternalLevel for shards stored
+ * on external storage. This is a distinction that can probably be avoided
+ * with some more thought being put into interface design.
+ *
+ */
+#pragma once
+
+#include <vector>
+#include <memory>
+
+#include "util/types.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/BufferView.h"
+
+namespace de {
+template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q>
+class InternalLevel;
+
+
+
+template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q>
+class InternalLevel {
+ typedef S Shard;
+ typedef BufferView<R> BuffView;
+public:
+ InternalLevel(ssize_t level_no, size_t shard_cap)
+ : m_level_no(level_no)
+ , m_shard_cnt(0)
+ , m_shards(shard_cap, nullptr)
+ , m_pending_shard(nullptr)
+ {}
+
+ ~InternalLevel() {
+ delete m_pending_shard;
+ }
+
+ /*
+ * Create a new shard combining the records from base_level and new_level,
+ * and return a shared_ptr to a new level containing this shard. This is used
+ * for reconstructions under the leveling layout policy.
+ *
+ * No changes are made to the levels provided as arguments.
+ */
+ static std::shared_ptr<InternalLevel> reconstruction(InternalLevel* base_level, InternalLevel* new_level) {
+ assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
+ auto res = new InternalLevel(base_level->m_level_no, 1);
+ res->m_shard_cnt = 1;
+ std::vector<Shard *> shards = {base_level->m_shards[0].get(),
+ new_level->m_shards[0].get()};
+
+ res->m_shards[0] = std::make_shared<S>(shards);
+ return std::shared_ptr<InternalLevel>(res);
+ }
+
+ /*
+ * Create a new shard combining the records from all of
+ * the shards in level, and append this new shard into
+ * this level. This is used for reconstructions under
+ * the tiering layout policy.
+ *
+ * No changes are made to the level provided as an argument.
+ */
+ void append_level(InternalLevel* level) {
+ // FIXME: that this is happening probably means that
+ // something is going terribly wrong earlier in the
+ // reconstruction logic.
+ if (level->get_shard_count() == 0) {
+ return;
+ }
+
+ std::vector<S*> shards;
+ for (auto shard : level->m_shards) {
+ if (shard) shards.emplace_back(shard.get());
+ }
+
+ if (m_shard_cnt == m_shards.size()) {
+ m_pending_shard = new S(shards);
+ return;
+ }
+
+ auto tmp = new S(shards);
+ m_shards[m_shard_cnt] = std::shared_ptr<S>(tmp);
+
+ ++m_shard_cnt;
+ }
+
+ /*
+ * Create a new shard using the records in the
+ * provided buffer, and append this new shard
+ * into this level. This is used for buffer
+ * flushes under the tiering layout policy.
+ */
+ void append_buffer(BuffView buffer) {
+ if (m_shard_cnt == m_shards.size()) {
+ assert(m_pending_shard == nullptr);
+ m_pending_shard = new S(std::move(buffer));
+ return;
+ }
+
+ m_shards[m_shard_cnt] = std::make_shared<S>(std::move(buffer));
+ ++m_shard_cnt;
+ }
+
+ void finalize() {
+ if (m_pending_shard) {
+ for (size_t i=0; i<m_shards.size(); i++) {
+ m_shards[i] = nullptr;
+ }
+
+ m_shards[0] = std::shared_ptr<S>(m_pending_shard);
+ m_pending_shard = nullptr;
+ m_shard_cnt = 1;
+ }
+ }
+
+ /*
+ * Create a new shard containing the combined records
+ * from all shards on this level and return it.
+ *
+ * No changes are made to this level.
+ */
+ Shard *get_combined_shard() {
+ if (m_shard_cnt == 0) {
+ return nullptr;
+ }
+
+ std::vector<Shard *> shards;
+ for (auto shard : m_shards) {
+ if (shard) shards.emplace_back(shard.get());
+ }
+
+ return new S(shards);
+ }
+
+ void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ auto shard_state = Q::get_query_state(m_shards[i].get(), query_parms);
+ shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i].get()});
+ shard_states.emplace_back(shard_state);
+ }
+ }
+ }
+
+ bool check_tombstone(size_t shard_stop, const R& rec) {
+ if (m_shard_cnt == 0) return false;
+
+ for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) {
+ if (m_shards[i]) {
+ auto res = m_shards[i]->point_lookup(rec, true);
+ if (res && res->is_tombstone()) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ bool delete_record(const R &rec) {
+ if (m_shard_cnt == 0) return false;
+
+ for (size_t i = 0; i < m_shards.size(); ++i) {
+ if (m_shards[i]) {
+ auto res = m_shards[i]->point_lookup(rec);
+ if (res) {
+ res->set_delete();
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ Shard* get_shard(size_t idx) {
+ return m_shards[idx].get();
+ }
+
+ size_t get_shard_count() {
+ return m_shard_cnt;
+ }
+
+ size_t get_record_count() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ cnt += m_shards[i]->get_record_count();
+ }
+ }
+
+ return cnt;
+ }
+
+ size_t get_tombstone_count() {
+ size_t res = 0;
+ for (size_t i = 0; i < m_shard_cnt; ++i) {
+ if (m_shards[i]) {
+ res += m_shards[i]->get_tombstone_count();
+ }
+ }
+ return res;
+ }
+
+ size_t get_aux_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]){
+ cnt += m_shards[i]->get_aux_memory_usage();
+ }
+ }
+
+ return cnt;
+ }
+
+ size_t get_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ cnt += m_shards[i]->get_memory_usage();
+ }
+ }
+
+ return cnt;
+ }
+
+ double get_tombstone_prop() {
+ size_t tscnt = 0;
+ size_t reccnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ tscnt += m_shards[i]->get_tombstone_count();
+ reccnt += m_shards[i]->get_record_count();
+ }
+ }
+
+ return (double) tscnt / (double) (tscnt + reccnt);
+ }
+
+ std::shared_ptr<InternalLevel> clone() {
+ auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size());
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ new_level->m_shards[i] = m_shards[i];
+ }
+ new_level->m_shard_cnt = m_shard_cnt;
+
+ return new_level;
+ }
+
+private:
+ ssize_t m_level_no;
+
+ size_t m_shard_cnt;
+ size_t m_shard_size_cap;
+
+ std::vector<std::shared_ptr<Shard>> m_shards;
+ Shard *m_pending_shard;
+};
+
+}