summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-05-09 10:39:35 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-05-09 10:40:45 -0400
commit3e5df1ab4f581c795b7d3e1f1ba385efe47d6f15 (patch)
tree5cfe94f3f4489522876ed7ac5f83bf601e3b74c7 /include
parenta104b9e9496ec20cbf0398d2606358efe06608d8 (diff)
downloaddynamic-extension-3e5df1ab4f581c795b7d3e1f1ba385efe47d6f15.tar.gz
Initial port of InternalLevel
Diffstat (limited to 'include')
-rw-r--r--include/framework/InternalLevel.h257
-rw-r--r--include/framework/MutableBuffer.h9
2 files changed, 266 insertions, 0 deletions
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
new file mode 100644
index 0000000..5134584
--- /dev/null
+++ b/include/framework/InternalLevel.h
@@ -0,0 +1,257 @@
+/*
+ * include/framework/InternalLevel.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <vector>
+#include <memory>
+
+#include "util/types.h"
+#include "util/bf_config.h"
+#include "shard/WIRS.h"
+#include "ds/BloomFilter.h"
+
+namespace de {
+
+template <typename K, typename V, typename W=void>
+class InternalLevel {
+
+ static const size_t REJECTION_TRIGGER_THRESHOLD = 1024;
+
+private:
+ struct InternalLevelStructure {
+ InternalLevelStructure(size_t cap)
+ : m_cap(cap)
+ , m_runs(new WIRS<K, V, W>*[cap]{nullptr})
+ , m_bfs(new BloomFilter*[cap]{nullptr}) {}
+
+ ~InternalLevelStructure() {
+ for (size_t i = 0; i < m_cap; ++i) {
+ if (m_runs[i]) delete m_runs[i];
+ if (m_bfs[i]) delete m_bfs[i];
+ }
+
+ delete[] m_runs;
+ delete[] m_bfs;
+ }
+
+ size_t m_cap;
+ WIRS<K, V, W>** m_runs;
+ BloomFilter** m_bfs;
+ };
+
+public:
+ InternalLevel(ssize_t level_no, size_t run_cap, bool tagging)
+ : m_level_no(level_no), m_run_cnt(0)
+ , m_structure(new InternalLevelStructure(run_cap))
+ , m_tagging(tagging) {}
+
+ // Create a new memory level sharing the runs and repurposing it as previous level_no + 1
+ // WARNING: for leveling only.
+ InternalLevel(InternalLevel* level, bool tagging)
+ : m_level_no(level->m_level_no + 1), m_run_cnt(level->m_run_cnt)
+ , m_structure(level->m_structure)
+ , m_tagging(tagging) {
+ assert(m_structure->m_cap == 1 && m_run_cnt == 1);
+ }
+
+
+ ~InternalLevel() {}
+
+ // WARNING: for leveling only.
+ // assuming the base level is the level new level is merging into. (base_level is larger.)
+ static InternalLevel* merge_levels(InternalLevel* base_level, InternalLevel* new_level, bool tagging, const gsl_rng* rng) {
+ assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
+ auto res = new InternalLevel(base_level->m_level_no, 1, tagging);
+ res->m_run_cnt = 1;
+ res->m_structure->m_bfs[0] =
+ new BloomFilter(BF_FPR,
+ new_level->get_tombstone_count() + base_level->get_tombstone_count(),
+ BF_HASH_FUNCS, rng);
+ WIRS<K, V, W>* runs[2];
+ runs[0] = base_level->m_structure->m_runs[0];
+ runs[1] = new_level->m_structure->m_runs[0];
+
+ res->m_structure->m_runs[0] = new WIRS<K, V, W>(runs, 2, res->m_structure->m_bfs[0], tagging);
+ return res;
+ }
+
+ void append_mem_table(MutableBuffer<K,V,W>* buffer, const gsl_rng* rng) {
+ assert(m_run_cnt < m_structure->m_cap);
+ m_structure->m_bfs[m_run_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng);
+ m_structure->m_runs[m_run_cnt] = new WIRS<K, V, W>(buffer, m_structure->m_bfs[m_run_cnt], m_tagging);
+ ++m_run_cnt;
+ }
+
+ void append_merged_runs(InternalLevel* level, const gsl_rng* rng) {
+ assert(m_run_cnt < m_structure->m_cap);
+ m_structure->m_bfs[m_run_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng);
+ m_structure->m_runs[m_run_cnt] = new WIRS<K, V, W>(level->m_structure->m_runs, level->m_run_cnt, m_structure->m_bfs[m_run_cnt], m_tagging);
+ ++m_run_cnt;
+ }
+
+ WIRS<K, V, W> *get_merged_run() {
+ WIRS<K, V, W> *runs[m_run_cnt];
+
+ for (size_t i=0; i<m_run_cnt; i++) {
+ runs[i] = (m_structure->m_runs[i]) ? m_structure->m_runs[i] : nullptr;
+ }
+
+ return new WIRS<K, V, W>(runs, m_run_cnt, nullptr, m_tagging);
+ }
+
+ // Append the sample range in-order.....
+ void get_run_weights(std::vector<W>& weights, std::vector<std::pair<ShardID, WIRS<K, V, W> *>> &runs, std::vector<void*>& run_states, const K& low, const K& high) {
+ for (size_t i=0; i<m_run_cnt; i++) {
+ if (m_structure->m_runs[i]) {
+ auto run_state = m_structure->m_runs[i]->get_sample_run_state(low, high);
+ if (run_state->tot_weight > 0) {
+ runs.push_back({{m_level_no, (ssize_t) i}, m_structure->m_runs[i]});
+ weights.push_back(run_state->tot_weight);
+ run_states.emplace_back(run_state);
+ } else {
+ delete run_state;
+ }
+ }
+ }
+ }
+
+ bool bf_rejection_check(size_t run_stop, const K& key) {
+ for (size_t i = 0; i < run_stop; ++i) {
+ if (m_structure->m_bfs[i] && m_structure->m_bfs[i]->lookup(key))
+ return true;
+ }
+ return false;
+ }
+
+ bool check_tombstone(size_t run_stop, const K& key, const V& val) {
+ if (m_run_cnt == 0) return false;
+
+ for (int i = m_run_cnt - 1; i >= (ssize_t) run_stop; i--) {
+ if (m_structure->m_runs[i] && (m_structure->m_bfs[i]->lookup(key))
+ && m_structure->m_runs[i]->check_tombstone(key, val))
+ return true;
+ }
+ return false;
+ }
+
+ bool delete_record(const K& key, const V& val) {
+ for (size_t i = 0; i < m_structure->m_cap; ++i) {
+ if (m_structure->m_runs[i] && m_structure->m_runs[i]->delete_record(key, val)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ const Record<K, V, W>* get_record_at(size_t run_no, size_t idx) {
+ return m_structure->m_runs[run_no]->get_record_at(idx);
+ }
+
+ WIRS<K, V, W>* get_run(size_t idx) {
+ return m_structure->m_runs[idx];
+ }
+
+ size_t get_run_count() {
+ return m_run_cnt;
+ }
+
+ size_t get_record_cnt() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_run_cnt; i++) {
+ cnt += m_structure->m_runs[i]->get_record_count();
+ }
+
+ return cnt;
+ }
+
+ size_t get_tombstone_count() {
+ size_t res = 0;
+ for (size_t i = 0; i < m_run_cnt; ++i) {
+ res += m_structure->m_runs[i]->get_tombstone_count();
+ }
+ return res;
+ }
+
+ size_t get_aux_memory_utilization() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_run_cnt; i++) {
+ if (m_structure->m_bfs[i]) {
+ cnt += m_structure->m_bfs[i]->get_memory_utilization();
+ }
+ }
+
+ return cnt;
+ }
+
+ size_t get_memory_utilization() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_run_cnt; i++) {
+ if (m_structure->m_runs[i]) {
+ cnt += m_structure->m_runs[i]->get_memory_utilization();
+ }
+ }
+
+ return cnt;
+ }
+
+ double get_tombstone_prop() {
+ size_t tscnt = 0;
+ size_t reccnt = 0;
+ for (size_t i=0; i<m_run_cnt; i++) {
+ if (m_structure->m_runs[i]) {
+ tscnt += m_structure->m_runs[i]->get_tombstone_count();
+ reccnt += m_structure->m_runs[i]->get_record_count();
+ }
+ }
+
+ return (double) tscnt / (double) (tscnt + reccnt);
+ }
+
+ size_t get_rejection_count() {
+ size_t rej_cnt = 0;
+ for (size_t i=0; i<m_run_cnt; i++) {
+ if (m_structure->m_runs[i]) {
+ rej_cnt += m_structure->m_runs[i]->get_rejection_count();
+ }
+ }
+
+ return rej_cnt;
+ }
+
+ double get_rejection_rate() {
+ size_t rej_cnt = 0;
+ size_t attempt_cnt = 0;
+ for (size_t i=0; i<m_run_cnt; i++) {
+ if (m_structure->m_runs[i]) {
+ attempt_cnt += m_structure->m_runs[i]->get_ts_check_count();
+ rej_cnt += m_structure->m_runs[i]->get_rejection_count();
+ }
+ }
+
+ if (attempt_cnt == 0) return 0;
+
+ // the rejection rate is considered 0 until we exceed an
+ // absolute threshold of rejections.
+ if (rej_cnt <= REJECTION_TRIGGER_THRESHOLD) return 0;
+
+ return (double) rej_cnt / (double) attempt_cnt;
+ }
+
+private:
+ ssize_t m_level_no;
+
+ size_t m_run_cnt;
+ size_t m_run_size_cap;
+ bool m_tagging;
+ std::shared_ptr<InternalLevelStructure> m_structure;
+};
+
+}
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h
index 1a5c387..567b1d7 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/MutableBuffer.h
@@ -1,3 +1,12 @@
+/*
+ * include/framework/MutableBuffer.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
#pragma once
#include <cstdlib>