summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-05-09 15:13:35 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-05-09 15:13:35 -0400
commitc867d59aaf61bb1a0839d34b56935f52c0fc6e00 (patch)
treee01596ec9cd950d9ca6af8c4a0c469dfca1f8815 /include
parent9b55600a3bb50d95d6c47e0339f861448ca18c30 (diff)
downloaddynamic-extension-c867d59aaf61bb1a0839d34b56935f52c0fc6e00.tar.gz
Dynamic Extension unit tests + bugfixes
Diffstat (limited to 'include')
-rw-r--r--include/framework/DynamicExtension.h30
-rw-r--r--include/framework/InternalLevel.h146
-rw-r--r--include/shard/WIRS.h21
3 files changed, 100 insertions, 97 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 93de24f..ed01e47 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -3,7 +3,7 @@
#include <atomic>
#include <numeric>
#include <cstdio>
-#include<vector>
+#include <vector>
#include "framework/MutableBuffer.h"
#include "framework/InternalLevel.h"
@@ -42,7 +42,7 @@ static constexpr bool LSM_REJ_SAMPLE = false;
// True for leveling, false for tiering
static constexpr bool LSM_LEVELING = false;
-static constexpr bool DELETE_TAGGING = true;
+static constexpr bool DELETE_TAGGING = false;
// TODO: Replace the constexpr bools above
// with template parameters based on these
@@ -107,7 +107,7 @@ public:
int append(const K& key, const V& val, W weight, bool tombstone, gsl_rng *rng) {
// NOTE: single-threaded implementation only
MutableBuffer<K,V,W> *buffer;
- while (!(buffer = buffer()))
+ while (!(buffer = get_buffer()))
;
if (buffer->is_full()) {
@@ -138,8 +138,8 @@ public:
shards.push_back({{-1, -1}, nullptr});
states.push_back(nullptr);
- std::vector<double> shard_weights;
- shard_weights.push_back(buffer_weight);
+ std::vector<W> shard_weights;
+ shard_weights.push_back((double) buffer_weight);
for (auto &level : m_levels) {
level->get_shard_weights(shard_weights, shards, states, lower_key, upper_key);
@@ -153,10 +153,13 @@ public:
}
double tot_weight = std::accumulate(shard_weights.begin(), shard_weights.end(), 0);
- for (auto& w: shard_weights) w /= tot_weight;
+ std::vector<double> normalized_weights(shard_weights.size());
+ for (size_t i=0; i<shard_weights.size(); i++) {
+ normalized_weights[i] = ((double) shard_weights[i]) / tot_weight;
+ }
// Construct alias structure
- auto alias = Alias(shard_weights);
+ auto alias = Alias(normalized_weights);
std::vector<size_t> shard_samples(shard_weights.size(), 0);
@@ -214,13 +217,13 @@ public:
shards[i].second->get_samples(states[i], results, lower_key, upper_key, shard_samples[i], rng);
- for (size_t i=0; i<results.size(); i++) {
- if (results[i]->is_tombstone() || is_deleted(results[i], shards[i].first, buffer, buffer_cutoff)) {
+ for (size_t j=0; j<results.size(); j++) {
+ if (rejection(&results[j], shards[i].first, lower_key, upper_key, buffer, buffer_cutoff)) {
rejections++;
continue;
}
- sample_set[sample_idx++] = results[i];
+ sample_set[sample_idx++] = results[j];
}
shard_samples[i] = 0;
@@ -396,7 +399,7 @@ private:
} else if (record->key < lower_bound || record->key > upper_bound) {
bounds_rejections++;
return true;
- } else if (is_deleted(record, shid, buffer, buffer, buffer_cutoff)) {
+ } else if (is_deleted(record, shid, buffer, buffer_cutoff)) {
deletion_rejections++;
return true;
}
@@ -431,8 +434,7 @@ private:
size_t new_shard_cnt = (LSM_LEVELING) ? 1 : m_scale_factor;
new_idx = m_levels.size();
if (new_idx > 0) {
- auto res = m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count();
- assert(res == 0);
+ assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
}
m_levels.emplace_back(new InternalLevel<K,V,W>(new_idx, new_shard_cnt, DELETE_TAGGING));
@@ -453,7 +455,7 @@ private:
merge_buffer_into_l0(buffer, rng);
enforce_delete_maximum(0, rng);
- buffer->tshardcate();
+ buffer->truncate();
return;
}
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index 5134584..727a382 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -28,37 +28,37 @@ private:
struct InternalLevelStructure {
InternalLevelStructure(size_t cap)
: m_cap(cap)
- , m_runs(new WIRS<K, V, W>*[cap]{nullptr})
+ , m_shards(new WIRS<K, V, W>*[cap]{nullptr})
, m_bfs(new BloomFilter*[cap]{nullptr}) {}
~InternalLevelStructure() {
for (size_t i = 0; i < m_cap; ++i) {
- if (m_runs[i]) delete m_runs[i];
+ if (m_shards[i]) delete m_shards[i];
if (m_bfs[i]) delete m_bfs[i];
}
- delete[] m_runs;
+ delete[] m_shards;
delete[] m_bfs;
}
size_t m_cap;
- WIRS<K, V, W>** m_runs;
+ WIRS<K, V, W>** m_shards;
BloomFilter** m_bfs;
};
public:
- InternalLevel(ssize_t level_no, size_t run_cap, bool tagging)
- : m_level_no(level_no), m_run_cnt(0)
- , m_structure(new InternalLevelStructure(run_cap))
+ InternalLevel(ssize_t level_no, size_t shard_cap, bool tagging)
+ : m_level_no(level_no), m_shard_cnt(0)
+ , m_structure(new InternalLevelStructure(shard_cap))
, m_tagging(tagging) {}
- // Create a new memory level sharing the runs and repurposing it as previous level_no + 1
+ // Create a new memory level sharing the shards and repurposing it as previous level_no + 1
// WARNING: for leveling only.
InternalLevel(InternalLevel* level, bool tagging)
- : m_level_no(level->m_level_no + 1), m_run_cnt(level->m_run_cnt)
+ : m_level_no(level->m_level_no + 1), m_shard_cnt(level->m_shard_cnt)
, m_structure(level->m_structure)
, m_tagging(tagging) {
- assert(m_structure->m_cap == 1 && m_run_cnt == 1);
+ assert(m_structure->m_cap == 1 && m_shard_cnt == 1);
}
@@ -69,73 +69,73 @@ public:
static InternalLevel* merge_levels(InternalLevel* base_level, InternalLevel* new_level, bool tagging, const gsl_rng* rng) {
assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
auto res = new InternalLevel(base_level->m_level_no, 1, tagging);
- res->m_run_cnt = 1;
+ res->m_shard_cnt = 1;
res->m_structure->m_bfs[0] =
new BloomFilter(BF_FPR,
new_level->get_tombstone_count() + base_level->get_tombstone_count(),
BF_HASH_FUNCS, rng);
- WIRS<K, V, W>* runs[2];
- runs[0] = base_level->m_structure->m_runs[0];
- runs[1] = new_level->m_structure->m_runs[0];
+ WIRS<K, V, W>* shards[2];
+ shards[0] = base_level->m_structure->m_shards[0];
+ shards[1] = new_level->m_structure->m_shards[0];
- res->m_structure->m_runs[0] = new WIRS<K, V, W>(runs, 2, res->m_structure->m_bfs[0], tagging);
+ res->m_structure->m_shards[0] = new WIRS<K, V, W>(shards, 2, res->m_structure->m_bfs[0], tagging);
return res;
}
void append_mem_table(MutableBuffer<K,V,W>* buffer, const gsl_rng* rng) {
- assert(m_run_cnt < m_structure->m_cap);
- m_structure->m_bfs[m_run_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng);
- m_structure->m_runs[m_run_cnt] = new WIRS<K, V, W>(buffer, m_structure->m_bfs[m_run_cnt], m_tagging);
- ++m_run_cnt;
+ assert(m_shard_cnt < m_structure->m_cap);
+ m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng);
+ m_structure->m_shards[m_shard_cnt] = new WIRS<K, V, W>(buffer, m_structure->m_bfs[m_shard_cnt], m_tagging);
+ ++m_shard_cnt;
}
- void append_merged_runs(InternalLevel* level, const gsl_rng* rng) {
- assert(m_run_cnt < m_structure->m_cap);
- m_structure->m_bfs[m_run_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng);
- m_structure->m_runs[m_run_cnt] = new WIRS<K, V, W>(level->m_structure->m_runs, level->m_run_cnt, m_structure->m_bfs[m_run_cnt], m_tagging);
- ++m_run_cnt;
+ void append_merged_shards(InternalLevel* level, const gsl_rng* rng) {
+ assert(m_shard_cnt < m_structure->m_cap);
+ m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng);
+ m_structure->m_shards[m_shard_cnt] = new WIRS<K, V, W>(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt], m_tagging);
+ ++m_shard_cnt;
}
- WIRS<K, V, W> *get_merged_run() {
- WIRS<K, V, W> *runs[m_run_cnt];
+ WIRS<K, V, W> *get_merged_shard() {
+ WIRS<K, V, W> *shards[m_shard_cnt];
- for (size_t i=0; i<m_run_cnt; i++) {
- runs[i] = (m_structure->m_runs[i]) ? m_structure->m_runs[i] : nullptr;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ shards[i] = (m_structure->m_shards[i]) ? m_structure->m_shards[i] : nullptr;
}
- return new WIRS<K, V, W>(runs, m_run_cnt, nullptr, m_tagging);
+ return new WIRS<K, V, W>(shards, m_shard_cnt, nullptr, m_tagging);
}
// Append the sample range in-order.....
- void get_run_weights(std::vector<W>& weights, std::vector<std::pair<ShardID, WIRS<K, V, W> *>> &runs, std::vector<void*>& run_states, const K& low, const K& high) {
- for (size_t i=0; i<m_run_cnt; i++) {
- if (m_structure->m_runs[i]) {
- auto run_state = m_structure->m_runs[i]->get_sample_run_state(low, high);
- if (run_state->tot_weight > 0) {
- runs.push_back({{m_level_no, (ssize_t) i}, m_structure->m_runs[i]});
- weights.push_back(run_state->tot_weight);
- run_states.emplace_back(run_state);
+ void get_shard_weights(std::vector<W>& weights, std::vector<std::pair<ShardID, WIRS<K, V, W> *>> &shards, std::vector<void*>& shard_states, const K& low, const K& high) {
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_structure->m_shards[i]) {
+ auto shard_state = m_structure->m_shards[i]->get_sample_shard_state(low, high);
+ if (shard_state->tot_weight > 0) {
+ shards.push_back({{m_level_no, (ssize_t) i}, m_structure->m_shards[i]});
+ weights.push_back(shard_state->tot_weight);
+ shard_states.emplace_back(shard_state);
} else {
- delete run_state;
+ WIRS<K, V, W>::delete_state(shard_state);
}
}
}
}
- bool bf_rejection_check(size_t run_stop, const K& key) {
- for (size_t i = 0; i < run_stop; ++i) {
+ bool bf_rejection_check(size_t shard_stop, const K& key) {
+ for (size_t i = 0; i < shard_stop; ++i) {
if (m_structure->m_bfs[i] && m_structure->m_bfs[i]->lookup(key))
return true;
}
return false;
}
- bool check_tombstone(size_t run_stop, const K& key, const V& val) {
- if (m_run_cnt == 0) return false;
+ bool check_tombstone(size_t shard_stop, const K& key, const V& val) {
+ if (m_shard_cnt == 0) return false;
- for (int i = m_run_cnt - 1; i >= (ssize_t) run_stop; i--) {
- if (m_structure->m_runs[i] && (m_structure->m_bfs[i]->lookup(key))
- && m_structure->m_runs[i]->check_tombstone(key, val))
+ for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) {
+ if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(key))
+ && m_structure->m_shards[i]->check_tombstone(key, val))
return true;
}
return false;
@@ -143,7 +143,7 @@ public:
bool delete_record(const K& key, const V& val) {
for (size_t i = 0; i < m_structure->m_cap; ++i) {
- if (m_structure->m_runs[i] && m_structure->m_runs[i]->delete_record(key, val)) {
+ if (m_structure->m_shards[i] && m_structure->m_shards[i]->delete_record(key, val)) {
return true;
}
}
@@ -151,22 +151,22 @@ public:
return false;
}
- const Record<K, V, W>* get_record_at(size_t run_no, size_t idx) {
- return m_structure->m_runs[run_no]->get_record_at(idx);
+ const Record<K, V, W>* get_record_at(size_t shard_no, size_t idx) {
+ return m_structure->m_shards[shard_no]->get_record_at(idx);
}
- WIRS<K, V, W>* get_run(size_t idx) {
- return m_structure->m_runs[idx];
+ WIRS<K, V, W>* get_shard(size_t idx) {
+ return m_structure->m_shards[idx];
}
- size_t get_run_count() {
- return m_run_cnt;
+ size_t get_shard_count() {
+ return m_shard_cnt;
}
size_t get_record_cnt() {
size_t cnt = 0;
- for (size_t i=0; i<m_run_cnt; i++) {
- cnt += m_structure->m_runs[i]->get_record_count();
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ cnt += m_structure->m_shards[i]->get_record_count();
}
return cnt;
@@ -174,15 +174,15 @@ public:
size_t get_tombstone_count() {
size_t res = 0;
- for (size_t i = 0; i < m_run_cnt; ++i) {
- res += m_structure->m_runs[i]->get_tombstone_count();
+ for (size_t i = 0; i < m_shard_cnt; ++i) {
+ res += m_structure->m_shards[i]->get_tombstone_count();
}
return res;
}
size_t get_aux_memory_utilization() {
size_t cnt = 0;
- for (size_t i=0; i<m_run_cnt; i++) {
+ for (size_t i=0; i<m_shard_cnt; i++) {
if (m_structure->m_bfs[i]) {
cnt += m_structure->m_bfs[i]->get_memory_utilization();
}
@@ -193,9 +193,9 @@ public:
size_t get_memory_utilization() {
size_t cnt = 0;
- for (size_t i=0; i<m_run_cnt; i++) {
- if (m_structure->m_runs[i]) {
- cnt += m_structure->m_runs[i]->get_memory_utilization();
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_structure->m_shards[i]) {
+ cnt += m_structure->m_shards[i]->get_memory_utilization();
}
}
@@ -205,10 +205,10 @@ public:
double get_tombstone_prop() {
size_t tscnt = 0;
size_t reccnt = 0;
- for (size_t i=0; i<m_run_cnt; i++) {
- if (m_structure->m_runs[i]) {
- tscnt += m_structure->m_runs[i]->get_tombstone_count();
- reccnt += m_structure->m_runs[i]->get_record_count();
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_structure->m_shards[i]) {
+ tscnt += m_structure->m_shards[i]->get_tombstone_count();
+ reccnt += m_structure->m_shards[i]->get_record_count();
}
}
@@ -217,9 +217,9 @@ public:
size_t get_rejection_count() {
size_t rej_cnt = 0;
- for (size_t i=0; i<m_run_cnt; i++) {
- if (m_structure->m_runs[i]) {
- rej_cnt += m_structure->m_runs[i]->get_rejection_count();
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_structure->m_shards[i]) {
+ rej_cnt += m_structure->m_shards[i]->get_rejection_count();
}
}
@@ -229,10 +229,10 @@ public:
double get_rejection_rate() {
size_t rej_cnt = 0;
size_t attempt_cnt = 0;
- for (size_t i=0; i<m_run_cnt; i++) {
- if (m_structure->m_runs[i]) {
- attempt_cnt += m_structure->m_runs[i]->get_ts_check_count();
- rej_cnt += m_structure->m_runs[i]->get_rejection_count();
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_structure->m_shards[i]) {
+ attempt_cnt += m_structure->m_shards[i]->get_ts_check_count();
+ rej_cnt += m_structure->m_shards[i]->get_rejection_count();
}
}
@@ -248,8 +248,8 @@ public:
private:
ssize_t m_level_no;
- size_t m_run_cnt;
- size_t m_run_size_cap;
+ size_t m_shard_cnt;
+ size_t m_shard_size_cap;
bool m_tagging;
std::shared_ptr<InternalLevelStructure> m_structure;
};
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h
index c89a407..39337bf 100644
--- a/include/shard/WIRS.h
+++ b/include/shard/WIRS.h
@@ -22,7 +22,7 @@
namespace de {
-thread_local size_t m_wirsrun_cancelations = 0;
+thread_local size_t wirs_cancelations = 0;
template <typename K, typename V, typename W>
class WIRS {
@@ -63,7 +63,7 @@ public:
if (!(base->is_tombstone()) && (base + 1) < stop) {
if (base->match(base + 1) && (base + 1)->is_tombstone()) {
base += 2;
- m_wirsrun_cancelations++;
+ wirs_cancelations++;
continue;
}
}
@@ -89,7 +89,7 @@ public:
}
}
- WIRS(WIRS** runs, size_t len, BloomFilter* bf, bool tagging)
+ WIRS(WIRS** shards, size_t len, BloomFilter* bf, bool tagging)
: m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0),
m_tagging(tagging), m_root(nullptr) {
std::vector<Cursor<K,V,W>> cursors;
@@ -100,10 +100,10 @@ public:
size_t attemp_reccnt = 0;
for (size_t i = 0; i < len; ++i) {
- if (runs[i]) {
- auto base = runs[i]->sorted_output();
- cursors.emplace_back(Cursor{base, base + runs[i]->get_record_count(), 0, runs[i]->get_record_count()});
- attemp_reccnt += runs[i]->get_record_count();
+ if (shards[i]) {
+ auto base = shards[i]->sorted_output();
+ cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
+ attemp_reccnt += shards[i]->get_record_count();
pq.push(cursors[i].ptr, i);
} else {
cursors.emplace_back(Cursor<K,V,W>{nullptr, nullptr, 0, 0});
@@ -200,7 +200,7 @@ public:
// low - high -> decompose to a set of nodes.
// Build Alias across the decomposed nodes.
- WIRSState* get_sample_run_state(const K& lower_key, const K& upper_key) {
+ WIRSState* get_sample_shard_state(const K& lower_key, const K& upper_key) {
WIRSState* res = new WIRSState();
// Simulate a stack to unfold recursion.
@@ -238,7 +238,8 @@ public:
// returns the number of records sampled
// NOTE: This operation returns records strictly between the lower and upper bounds, not
// including them.
- size_t get_samples(WIRSState* run_state, std::vector<Record<K, V, W>> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
+ size_t get_samples(void* shard_state, std::vector<Record<K, V, W>> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
+ WIRSState *state = (WIRSState *) shard_state;
if (sample_sz == 0) {
return 0;
}
@@ -249,7 +250,7 @@ public:
do {
++attempts;
// first level....
- auto node = run_state->nodes[run_state->top_level_alias->get(rng)];
+ auto node = state->nodes[state->top_level_alias->get(rng)];
// second level...
auto fat_point = node->low + node->alias->get(rng);
// third level...