summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt7
-rw-r--r--include/framework/DynamicExtension.h30
-rw-r--r--include/framework/InternalLevel.h146
-rw-r--r--include/shard/WIRS.h21
-rw-r--r--tests/dynamic_extension_tests.cpp398
-rw-r--r--tests/wirs_tests.cpp122
6 files changed, 565 insertions, 159 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 72ee9a4..c93f75a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
set(namespace "de")
project("Practical Dynamic Extension" VERSION 0.1.0)
-set(debug true)
+set(debug false)
set(tests True)
set(bench True)
@@ -45,6 +45,11 @@ if (tests)
target_link_libraries(mutable_buffer_tests PUBLIC gsl check subunit pthread)
target_include_directories(mutable_buffer_tests PRIVATE include)
+ add_executable(dynamic_extension_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/dynamic_extension_tests.cpp)
+ target_link_libraries(dynamic_extension_tests PUBLIC gsl check subunit pthread)
+ target_include_directories(dynamic_extension_tests PRIVATE include)
+
+
endif()
# Benchmark build instructions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 93de24f..ed01e47 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -3,7 +3,7 @@
#include <atomic>
#include <numeric>
#include <cstdio>
-#include<vector>
+#include <vector>
#include "framework/MutableBuffer.h"
#include "framework/InternalLevel.h"
@@ -42,7 +42,7 @@ static constexpr bool LSM_REJ_SAMPLE = false;
// True for leveling, false for tiering
static constexpr bool LSM_LEVELING = false;
-static constexpr bool DELETE_TAGGING = true;
+static constexpr bool DELETE_TAGGING = false;
// TODO: Replace the constexpr bools above
// with template parameters based on these
@@ -107,7 +107,7 @@ public:
int append(const K& key, const V& val, W weight, bool tombstone, gsl_rng *rng) {
// NOTE: single-threaded implementation only
MutableBuffer<K,V,W> *buffer;
- while (!(buffer = buffer()))
+ while (!(buffer = get_buffer()))
;
if (buffer->is_full()) {
@@ -138,8 +138,8 @@ public:
shards.push_back({{-1, -1}, nullptr});
states.push_back(nullptr);
- std::vector<double> shard_weights;
- shard_weights.push_back(buffer_weight);
+ std::vector<W> shard_weights;
+ shard_weights.push_back((double) buffer_weight);
for (auto &level : m_levels) {
level->get_shard_weights(shard_weights, shards, states, lower_key, upper_key);
@@ -153,10 +153,13 @@ public:
}
double tot_weight = std::accumulate(shard_weights.begin(), shard_weights.end(), 0);
- for (auto& w: shard_weights) w /= tot_weight;
+ std::vector<double> normalized_weights(shard_weights.size());
+ for (size_t i=0; i<shard_weights.size(); i++) {
+ normalized_weights[i] = ((double) shard_weights[i]) / tot_weight;
+ }
// Construct alias structure
- auto alias = Alias(shard_weights);
+ auto alias = Alias(normalized_weights);
std::vector<size_t> shard_samples(shard_weights.size(), 0);
@@ -214,13 +217,13 @@ public:
shards[i].second->get_samples(states[i], results, lower_key, upper_key, shard_samples[i], rng);
- for (size_t i=0; i<results.size(); i++) {
- if (results[i]->is_tombstone() || is_deleted(results[i], shards[i].first, buffer, buffer_cutoff)) {
+ for (size_t j=0; j<results.size(); j++) {
+ if (rejection(&results[j], shards[i].first, lower_key, upper_key, buffer, buffer_cutoff)) {
rejections++;
continue;
}
- sample_set[sample_idx++] = results[i];
+ sample_set[sample_idx++] = results[j];
}
shard_samples[i] = 0;
@@ -396,7 +399,7 @@ private:
} else if (record->key < lower_bound || record->key > upper_bound) {
bounds_rejections++;
return true;
- } else if (is_deleted(record, shid, buffer, buffer, buffer_cutoff)) {
+ } else if (is_deleted(record, shid, buffer, buffer_cutoff)) {
deletion_rejections++;
return true;
}
@@ -431,8 +434,7 @@ private:
size_t new_shard_cnt = (LSM_LEVELING) ? 1 : m_scale_factor;
new_idx = m_levels.size();
if (new_idx > 0) {
- auto res = m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count();
- assert(res == 0);
+ assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
}
m_levels.emplace_back(new InternalLevel<K,V,W>(new_idx, new_shard_cnt, DELETE_TAGGING));
@@ -453,7 +455,7 @@ private:
merge_buffer_into_l0(buffer, rng);
enforce_delete_maximum(0, rng);
- buffer->tshardcate();
+ buffer->truncate();
return;
}
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index 5134584..727a382 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -28,37 +28,37 @@ private:
struct InternalLevelStructure {
InternalLevelStructure(size_t cap)
: m_cap(cap)
- , m_runs(new WIRS<K, V, W>*[cap]{nullptr})
+ , m_shards(new WIRS<K, V, W>*[cap]{nullptr})
, m_bfs(new BloomFilter*[cap]{nullptr}) {}
~InternalLevelStructure() {
for (size_t i = 0; i < m_cap; ++i) {
- if (m_runs[i]) delete m_runs[i];
+ if (m_shards[i]) delete m_shards[i];
if (m_bfs[i]) delete m_bfs[i];
}
- delete[] m_runs;
+ delete[] m_shards;
delete[] m_bfs;
}
size_t m_cap;
- WIRS<K, V, W>** m_runs;
+ WIRS<K, V, W>** m_shards;
BloomFilter** m_bfs;
};
public:
- InternalLevel(ssize_t level_no, size_t run_cap, bool tagging)
- : m_level_no(level_no), m_run_cnt(0)
- , m_structure(new InternalLevelStructure(run_cap))
+ InternalLevel(ssize_t level_no, size_t shard_cap, bool tagging)
+ : m_level_no(level_no), m_shard_cnt(0)
+ , m_structure(new InternalLevelStructure(shard_cap))
, m_tagging(tagging) {}
- // Create a new memory level sharing the runs and repurposing it as previous level_no + 1
+ // Create a new memory level sharing the shards and repurposing it as previous level_no + 1
// WARNING: for leveling only.
InternalLevel(InternalLevel* level, bool tagging)
- : m_level_no(level->m_level_no + 1), m_run_cnt(level->m_run_cnt)
+ : m_level_no(level->m_level_no + 1), m_shard_cnt(level->m_shard_cnt)
, m_structure(level->m_structure)
, m_tagging(tagging) {
- assert(m_structure->m_cap == 1 && m_run_cnt == 1);
+ assert(m_structure->m_cap == 1 && m_shard_cnt == 1);
}
@@ -69,73 +69,73 @@ public:
static InternalLevel* merge_levels(InternalLevel* base_level, InternalLevel* new_level, bool tagging, const gsl_rng* rng) {
assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
auto res = new InternalLevel(base_level->m_level_no, 1, tagging);
- res->m_run_cnt = 1;
+ res->m_shard_cnt = 1;
res->m_structure->m_bfs[0] =
new BloomFilter(BF_FPR,
new_level->get_tombstone_count() + base_level->get_tombstone_count(),
BF_HASH_FUNCS, rng);
- WIRS<K, V, W>* runs[2];
- runs[0] = base_level->m_structure->m_runs[0];
- runs[1] = new_level->m_structure->m_runs[0];
+ WIRS<K, V, W>* shards[2];
+ shards[0] = base_level->m_structure->m_shards[0];
+ shards[1] = new_level->m_structure->m_shards[0];
- res->m_structure->m_runs[0] = new WIRS<K, V, W>(runs, 2, res->m_structure->m_bfs[0], tagging);
+ res->m_structure->m_shards[0] = new WIRS<K, V, W>(shards, 2, res->m_structure->m_bfs[0], tagging);
return res;
}
void append_mem_table(MutableBuffer<K,V,W>* buffer, const gsl_rng* rng) {
- assert(m_run_cnt < m_structure->m_cap);
- m_structure->m_bfs[m_run_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng);
- m_structure->m_runs[m_run_cnt] = new WIRS<K, V, W>(buffer, m_structure->m_bfs[m_run_cnt], m_tagging);
- ++m_run_cnt;
+ assert(m_shard_cnt < m_structure->m_cap);
+ m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng);
+ m_structure->m_shards[m_shard_cnt] = new WIRS<K, V, W>(buffer, m_structure->m_bfs[m_shard_cnt], m_tagging);
+ ++m_shard_cnt;
}
- void append_merged_runs(InternalLevel* level, const gsl_rng* rng) {
- assert(m_run_cnt < m_structure->m_cap);
- m_structure->m_bfs[m_run_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng);
- m_structure->m_runs[m_run_cnt] = new WIRS<K, V, W>(level->m_structure->m_runs, level->m_run_cnt, m_structure->m_bfs[m_run_cnt], m_tagging);
- ++m_run_cnt;
+ void append_merged_shards(InternalLevel* level, const gsl_rng* rng) {
+ assert(m_shard_cnt < m_structure->m_cap);
+ m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng);
+ m_structure->m_shards[m_shard_cnt] = new WIRS<K, V, W>(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt], m_tagging);
+ ++m_shard_cnt;
}
- WIRS<K, V, W> *get_merged_run() {
- WIRS<K, V, W> *runs[m_run_cnt];
+ WIRS<K, V, W> *get_merged_shard() {
+ WIRS<K, V, W> *shards[m_shard_cnt];
- for (size_t i=0; i<m_run_cnt; i++) {
- runs[i] = (m_structure->m_runs[i]) ? m_structure->m_runs[i] : nullptr;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ shards[i] = (m_structure->m_shards[i]) ? m_structure->m_shards[i] : nullptr;
}
- return new WIRS<K, V, W>(runs, m_run_cnt, nullptr, m_tagging);
+ return new WIRS<K, V, W>(shards, m_shard_cnt, nullptr, m_tagging);
}
// Append the sample range in-order.....
- void get_run_weights(std::vector<W>& weights, std::vector<std::pair<ShardID, WIRS<K, V, W> *>> &runs, std::vector<void*>& run_states, const K& low, const K& high) {
- for (size_t i=0; i<m_run_cnt; i++) {
- if (m_structure->m_runs[i]) {
- auto run_state = m_structure->m_runs[i]->get_sample_run_state(low, high);
- if (run_state->tot_weight > 0) {
- runs.push_back({{m_level_no, (ssize_t) i}, m_structure->m_runs[i]});
- weights.push_back(run_state->tot_weight);
- run_states.emplace_back(run_state);
+ void get_shard_weights(std::vector<W>& weights, std::vector<std::pair<ShardID, WIRS<K, V, W> *>> &shards, std::vector<void*>& shard_states, const K& low, const K& high) {
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_structure->m_shards[i]) {
+ auto shard_state = m_structure->m_shards[i]->get_sample_shard_state(low, high);
+ if (shard_state->tot_weight > 0) {
+ shards.push_back({{m_level_no, (ssize_t) i}, m_structure->m_shards[i]});
+ weights.push_back(shard_state->tot_weight);
+ shard_states.emplace_back(shard_state);
} else {
- delete run_state;
+ WIRS<K, V, W>::delete_state(shard_state);
}
}
}
}
- bool bf_rejection_check(size_t run_stop, const K& key) {
- for (size_t i = 0; i < run_stop; ++i) {
+ bool bf_rejection_check(size_t shard_stop, const K& key) {
+ for (size_t i = 0; i < shard_stop; ++i) {
if (m_structure->m_bfs[i] && m_structure->m_bfs[i]->lookup(key))
return true;
}
return false;
}
- bool check_tombstone(size_t run_stop, const K& key, const V& val) {
- if (m_run_cnt == 0) return false;
+ bool check_tombstone(size_t shard_stop, const K& key, const V& val) {
+ if (m_shard_cnt == 0) return false;
- for (int i = m_run_cnt - 1; i >= (ssize_t) run_stop; i--) {
- if (m_structure->m_runs[i] && (m_structure->m_bfs[i]->lookup(key))
- && m_structure->m_runs[i]->check_tombstone(key, val))
+ for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) {
+ if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(key))
+ && m_structure->m_shards[i]->check_tombstone(key, val))
return true;
}
return false;
@@ -143,7 +143,7 @@ public:
bool delete_record(const K& key, const V& val) {
for (size_t i = 0; i < m_structure->m_cap; ++i) {
- if (m_structure->m_runs[i] && m_structure->m_runs[i]->delete_record(key, val)) {
+ if (m_structure->m_shards[i] && m_structure->m_shards[i]->delete_record(key, val)) {
return true;
}
}
@@ -151,22 +151,22 @@ public:
return false;
}
- const Record<K, V, W>* get_record_at(size_t run_no, size_t idx) {
- return m_structure->m_runs[run_no]->get_record_at(idx);
+ const Record<K, V, W>* get_record_at(size_t shard_no, size_t idx) {
+ return m_structure->m_shards[shard_no]->get_record_at(idx);
}
- WIRS<K, V, W>* get_run(size_t idx) {
- return m_structure->m_runs[idx];
+ WIRS<K, V, W>* get_shard(size_t idx) {
+ return m_structure->m_shards[idx];
}
- size_t get_run_count() {
- return m_run_cnt;
+ size_t get_shard_count() {
+ return m_shard_cnt;
}
size_t get_record_cnt() {
size_t cnt = 0;
- for (size_t i=0; i<m_run_cnt; i++) {
- cnt += m_structure->m_runs[i]->get_record_count();
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ cnt += m_structure->m_shards[i]->get_record_count();
}
return cnt;
@@ -174,15 +174,15 @@ public:
size_t get_tombstone_count() {
size_t res = 0;
- for (size_t i = 0; i < m_run_cnt; ++i) {
- res += m_structure->m_runs[i]->get_tombstone_count();
+ for (size_t i = 0; i < m_shard_cnt; ++i) {
+ res += m_structure->m_shards[i]->get_tombstone_count();
}
return res;
}
size_t get_aux_memory_utilization() {
size_t cnt = 0;
- for (size_t i=0; i<m_run_cnt; i++) {
+ for (size_t i=0; i<m_shard_cnt; i++) {
if (m_structure->m_bfs[i]) {
cnt += m_structure->m_bfs[i]->get_memory_utilization();
}
@@ -193,9 +193,9 @@ public:
size_t get_memory_utilization() {
size_t cnt = 0;
- for (size_t i=0; i<m_run_cnt; i++) {
- if (m_structure->m_runs[i]) {
- cnt += m_structure->m_runs[i]->get_memory_utilization();
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_structure->m_shards[i]) {
+ cnt += m_structure->m_shards[i]->get_memory_utilization();
}
}
@@ -205,10 +205,10 @@ public:
double get_tombstone_prop() {
size_t tscnt = 0;
size_t reccnt = 0;
- for (size_t i=0; i<m_run_cnt; i++) {
- if (m_structure->m_runs[i]) {
- tscnt += m_structure->m_runs[i]->get_tombstone_count();
- reccnt += m_structure->m_runs[i]->get_record_count();
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_structure->m_shards[i]) {
+ tscnt += m_structure->m_shards[i]->get_tombstone_count();
+ reccnt += m_structure->m_shards[i]->get_record_count();
}
}
@@ -217,9 +217,9 @@ public:
size_t get_rejection_count() {
size_t rej_cnt = 0;
- for (size_t i=0; i<m_run_cnt; i++) {
- if (m_structure->m_runs[i]) {
- rej_cnt += m_structure->m_runs[i]->get_rejection_count();
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_structure->m_shards[i]) {
+ rej_cnt += m_structure->m_shards[i]->get_rejection_count();
}
}
@@ -229,10 +229,10 @@ public:
double get_rejection_rate() {
size_t rej_cnt = 0;
size_t attempt_cnt = 0;
- for (size_t i=0; i<m_run_cnt; i++) {
- if (m_structure->m_runs[i]) {
- attempt_cnt += m_structure->m_runs[i]->get_ts_check_count();
- rej_cnt += m_structure->m_runs[i]->get_rejection_count();
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_structure->m_shards[i]) {
+ attempt_cnt += m_structure->m_shards[i]->get_ts_check_count();
+ rej_cnt += m_structure->m_shards[i]->get_rejection_count();
}
}
@@ -248,8 +248,8 @@ public:
private:
ssize_t m_level_no;
- size_t m_run_cnt;
- size_t m_run_size_cap;
+ size_t m_shard_cnt;
+ size_t m_shard_size_cap;
bool m_tagging;
std::shared_ptr<InternalLevelStructure> m_structure;
};
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h
index c89a407..39337bf 100644
--- a/include/shard/WIRS.h
+++ b/include/shard/WIRS.h
@@ -22,7 +22,7 @@
namespace de {
-thread_local size_t m_wirsrun_cancelations = 0;
+thread_local size_t wirs_cancelations = 0;
template <typename K, typename V, typename W>
class WIRS {
@@ -63,7 +63,7 @@ public:
if (!(base->is_tombstone()) && (base + 1) < stop) {
if (base->match(base + 1) && (base + 1)->is_tombstone()) {
base += 2;
- m_wirsrun_cancelations++;
+ wirs_cancelations++;
continue;
}
}
@@ -89,7 +89,7 @@ public:
}
}
- WIRS(WIRS** runs, size_t len, BloomFilter* bf, bool tagging)
+ WIRS(WIRS** shards, size_t len, BloomFilter* bf, bool tagging)
: m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0),
m_tagging(tagging), m_root(nullptr) {
std::vector<Cursor<K,V,W>> cursors;
@@ -100,10 +100,10 @@ public:
size_t attemp_reccnt = 0;
for (size_t i = 0; i < len; ++i) {
- if (runs[i]) {
- auto base = runs[i]->sorted_output();
- cursors.emplace_back(Cursor{base, base + runs[i]->get_record_count(), 0, runs[i]->get_record_count()});
- attemp_reccnt += runs[i]->get_record_count();
+ if (shards[i]) {
+ auto base = shards[i]->sorted_output();
+ cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
+ attemp_reccnt += shards[i]->get_record_count();
pq.push(cursors[i].ptr, i);
} else {
cursors.emplace_back(Cursor<K,V,W>{nullptr, nullptr, 0, 0});
@@ -200,7 +200,7 @@ public:
// low - high -> decompose to a set of nodes.
// Build Alias across the decomposed nodes.
- WIRSState* get_sample_run_state(const K& lower_key, const K& upper_key) {
+ WIRSState* get_sample_shard_state(const K& lower_key, const K& upper_key) {
WIRSState* res = new WIRSState();
// Simulate a stack to unfold recursion.
@@ -238,7 +238,8 @@ public:
// returns the number of records sampled
// NOTE: This operation returns records strictly between the lower and upper bounds, not
// including them.
- size_t get_samples(WIRSState* run_state, std::vector<Record<K, V, W>> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
+ size_t get_samples(void* shard_state, std::vector<Record<K, V, W>> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
+ WIRSState *state = (WIRSState *) shard_state;
if (sample_sz == 0) {
return 0;
}
@@ -249,7 +250,7 @@ public:
do {
++attempts;
// first level....
- auto node = run_state->nodes[run_state->top_level_alias->get(rng)];
+ auto node = state->nodes[state->top_level_alias->get(rng)];
// second level...
auto fat_point = node->low + node->alias->get(rng);
// third level...
diff --git a/tests/dynamic_extension_tests.cpp b/tests/dynamic_extension_tests.cpp
new file mode 100644
index 0000000..69d36b4
--- /dev/null
+++ b/tests/dynamic_extension_tests.cpp
@@ -0,0 +1,398 @@
+#include <set>
+#include <random>
+#include <algorithm>
+
+#include "testing.h"
+#include "framework/DynamicExtension.h"
+
+#include <check.h>
+using namespace de;
+
+typedef DynamicExtension<uint64_t, uint32_t, uint64_t> DE_WIRS;
+
+START_TEST(t_create)
+{
+ auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng);
+
+
+ ck_assert_ptr_nonnull(ext_wirs);
+ ck_assert_int_eq(ext_wirs->get_record_cnt(), 0);
+ ck_assert_int_eq(ext_wirs->get_height(), 0);
+
+ delete ext_wirs;
+}
+END_TEST
+
+
+START_TEST(t_append)
+{
+ auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng);
+
+ uint64_t key = 0;
+ uint32_t val = 0;
+ for (size_t i=0; i<100; i++) {
+ ck_assert_int_eq(ext_wirs->append(key, val, 1, false, g_rng), 1);
+ key++;
+ val++;
+ }
+
+ ck_assert_int_eq(ext_wirs->get_height(), 0);
+ ck_assert_int_eq(ext_wirs->get_record_cnt(), 100);
+
+ delete ext_wirs;
+}
+END_TEST
+
+
+START_TEST(t_append_with_mem_merges)
+{
+ auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng);
+
+ uint64_t key = 0;
+ uint32_t val = 0;
+ for (size_t i=0; i<300; i++) {
+ ck_assert_int_eq(ext_wirs->append(key, val, 1, false, g_rng), 1);
+ key++;
+ val++;
+ }
+
+ ck_assert_int_eq(ext_wirs->get_record_cnt(), 300);
+ ck_assert_int_eq(ext_wirs->get_height(), 1);
+
+ delete ext_wirs;
+}
+END_TEST
+
+
+START_TEST(t_range_sample_memtable)
+{
+ auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng);
+
+ uint64_t key = 0;
+ uint32_t val = 0;
+ for (size_t i=0; i<100; i++) {
+ ck_assert_int_eq(ext_wirs->append(key, val, 1, false, g_rng), 1);
+ key++;
+ val++;
+ }
+
+ uint64_t lower_bound = 20;
+ uint64_t upper_bound = 50;
+
+ char *buf = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE);
+ char *util_buf = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE);
+ WeightedRec sample_set[100];
+
+ ext_wirs->range_sample(sample_set, lower_bound, upper_bound, 100, g_rng);
+
+ for(size_t i=0; i<100; i++) {
+ ck_assert_int_le(sample_set[i].key, upper_bound);
+ ck_assert_int_ge(sample_set[i].key, lower_bound);
+ }
+
+ free(buf);
+ free(util_buf);
+
+ delete ext_wirs;
+}
+END_TEST
+
+
+START_TEST(t_range_sample_memlevels)
+{
+ auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng);
+
+ uint64_t key = 0;
+ uint32_t val = 0;
+ for (size_t i=0; i<300; i++) {
+ ck_assert_int_eq(ext_wirs->append(key, val, 1, false, g_rng), 1);
+ key++;
+ val++;
+ }
+
+ uint64_t lower_bound = 100;
+ uint64_t upper_bound = 250;
+
+ char *buf = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE);
+ char *util_buf = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE);
+
+ WeightedRec sample_set[100];
+ ext_wirs->range_sample(sample_set, lower_bound, upper_bound, 100, g_rng);
+
+ for(size_t i=0; i<100; i++) {
+ ck_assert_int_le(sample_set[i].key, upper_bound);
+ ck_assert_int_ge(sample_set[i].key, lower_bound);
+ }
+
+ free(buf);
+ free(util_buf);
+
+ delete ext_wirs;
+}
+END_TEST
+
+START_TEST(t_range_sample_weighted)
+{
+ auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng);
+ size_t n = 10000;
+
+ std::vector<uint64_t> keys;
+
+ uint64_t key = 1;
+ for (size_t i=0; i< n / 2; i++) {
+ keys.push_back(key);
+ }
+
+ // put in a quarter of the count with weight two.
+ key = 2;
+ for (size_t i=0; i< n / 4; i++) {
+ keys.push_back(key);
+ }
+
+ // the remaining quarter with weight four.
+ key = 3;
+ for (size_t i=0; i< n / 4; i++) {
+ keys.push_back(key);
+ }
+
+ std::random_device rd;
+ std::mt19937 gen{rd()};
+ std::shuffle(keys.begin(), keys.end(), gen);
+
+ for (size_t i=0; i<keys.size(); i++) {
+ double weight;
+ if (keys[i] == 1) {
+ weight = 2.0;
+ } else if (keys[i] == 2) {
+ weight = 4.0;
+ } else {
+ weight = 8.0;
+ }
+
+ ext_wirs->append(keys[i], i, weight, false, g_rng);
+ }
+ size_t k = 1000;
+ uint64_t lower_key = 0;
+ uint64_t upper_key = 5;
+
+ WeightedRec* buffer = new WeightedRec[k]();
+ char *buffer1 = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE);
+ char *buffer2 = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE);
+
+ size_t cnt[3] = {0};
+ for (size_t i=0; i<1000; i++) {
+ ext_wirs->range_sample(buffer, lower_key, upper_key, k, g_rng);
+
+ for (size_t j=0; j<k; j++) {
+ cnt[buffer[j].key - 1]++;
+ }
+ }
+
+ ck_assert(roughly_equal(cnt[0] / 1000, (double) k/4.0, k, .05));
+ ck_assert(roughly_equal(cnt[1] / 1000, (double) k/4.0, k, .05));
+ ck_assert(roughly_equal(cnt[2] / 1000, (double) k/2.0, k, .05));
+
+ delete ext_wirs;
+ delete[] buffer;
+ free(buffer1);
+ free(buffer2);
+}
+END_TEST
+
+
+START_TEST(t_tombstone_merging_01)
+{
+ size_t reccnt = 100000;
+ auto ext_wirs = new DE_WIRS(100, 100, 2, .01, 1, g_rng);
+
+ std::set<std::pair<uint64_t, uint32_t>> records;
+ std::set<std::pair<uint64_t, uint32_t>> to_delete;
+ std::set<std::pair<uint64_t, uint32_t>> deleted;
+
+ while (records.size() < reccnt) {
+ uint64_t key = rand();
+ uint32_t val = rand();
+
+ if (records.find({key, val}) != records.end()) continue;
+
+ records.insert({key, val});
+ }
+
+ size_t deletes = 0;
+ size_t cnt=0;
+ for (auto rec : records) {
+ ck_assert_int_eq(ext_wirs->append(rec.first, rec.second, 1, false, g_rng), 1);
+
+ if (gsl_rng_uniform(g_rng) < 0.05 && !to_delete.empty()) {
+ std::vector<std::pair<uint64_t, uint32_t>> del_vec;
+ std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
+
+ for (size_t i=0; i<del_vec.size(); i++) {
+ ext_wirs->append(del_vec[i].first, del_vec[i].second, 1, true, g_rng);
+ deletes++;
+ to_delete.erase(del_vec[i]);
+ deleted.insert(del_vec[i]);
+ }
+ }
+
+ if (gsl_rng_uniform(g_rng) < 0.25 && deleted.find(rec) == deleted.end()) {
+ to_delete.insert(rec);
+ }
+
+ ck_assert(ext_wirs->validate_tombstone_proportion());
+ }
+
+ ck_assert(ext_wirs->validate_tombstone_proportion());
+
+ delete ext_wirs;
+}
+END_TEST
+
+DE_WIRS *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
+ auto ext_wirs = new DE_WIRS(1000, 1000, 2, 1, 1, g_rng);
+
+ std::set<std::pair<uint64_t, uint32_t>> records;
+ std::set<std::pair<uint64_t, uint32_t>> to_delete;
+ std::set<std::pair<uint64_t, uint32_t>> deleted;
+
+ while (records.size() < reccnt) {
+ uint64_t key = rand();
+ uint32_t val = rand();
+
+ if (records.find({key, val}) != records.end()) continue;
+
+ records.insert({key, val});
+ }
+
+ size_t deletes = 0;
+ for (auto rec : records) {
+ ck_assert_int_eq(ext_wirs->append(rec.first, rec.second, 1, 0, g_rng), 1);
+
+ if (gsl_rng_uniform(g_rng) < 0.05 && !to_delete.empty()) {
+ std::vector<std::pair<uint64_t, uint32_t>> del_vec;
+ std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
+
+ for (size_t i=0; i<del_vec.size(); i++) {
+ ext_wirs->append(del_vec[i].first, del_vec[i].second, 1, true, g_rng);
+ deletes++;
+ to_delete.erase(del_vec[i]);
+ deleted.insert(del_vec[i]);
+ }
+ }
+
+ if (gsl_rng_uniform(g_rng) < 0.25 && deleted.find(rec) == deleted.end()) {
+ to_delete.insert(rec);
+ }
+ }
+
+ return ext_wirs;
+}
+
+START_TEST(t_sorted_array)
+{
+ size_t reccnt = 100000;
+ auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng);
+
+ std::set<std::pair<uint64_t, uint32_t>> records;
+ std::set<std::pair<uint64_t, uint32_t>> to_delete;
+ std::set<std::pair<uint64_t, uint32_t>> deleted;
+
+ while (records.size() < reccnt) {
+ uint64_t key = rand();
+ uint32_t val = rand();
+
+ if (records.find({key, val}) != records.end()) continue;
+
+ records.insert({key, val});
+ }
+
+ size_t deletes = 0;
+ for (auto rec : records) {
+ ck_assert_int_eq(ext_wirs->append(rec.first, rec.second, 1, 0, g_rng), 1);
+
+ if (gsl_rng_uniform(g_rng) < 0.05 && !to_delete.empty()) {
+ std::vector<std::pair<uint64_t, uint32_t>> del_vec;
+ std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
+
+ for (size_t i=0; i<del_vec.size(); i++) {
+ ext_wirs->append(del_vec[i].first, del_vec[i].second, 1, true, g_rng);
+ deletes++;
+ to_delete.erase(del_vec[i]);
+ deleted.insert(del_vec[i]);
+ }
+ }
+
+ if (gsl_rng_uniform(g_rng) < 0.25 && deleted.find(rec) == deleted.end()) {
+ to_delete.insert(rec);
+ }
+ }
+
+ auto flat = ext_wirs->create_ssi();
+ ck_assert_int_eq(flat->get_record_count(), reccnt - deletes);
+
+ uint64_t prev_key = 0;
+ for (size_t i=0; i<flat->get_record_count(); i++) {
+ auto k = flat->get_record_at(i)->key;
+ ck_assert_int_ge(k, prev_key);
+ prev_key = k;
+ }
+
+ delete flat;
+ delete ext_wirs;
+}
+END_TEST
+
+
+Suite *unit_testing()
+{
+ Suite *unit = suite_create("de::DynamicExtension Unit Testing");
+
+ TCase *create = tcase_create("de::DynamicExtension::constructor Testing");
+ tcase_add_test(create, t_create);
+ suite_add_tcase(unit, create);
+
+ TCase *append = tcase_create("de::DynamicExtension::append Testing");
+ tcase_add_test(append, t_append);
+ tcase_add_test(append, t_append_with_mem_merges);
+ suite_add_tcase(unit, append);
+
+ TCase *sampling = tcase_create("de::DynamicExtension::range_sample Testing");
+
+ tcase_add_test(sampling, t_range_sample_memtable);
+ tcase_add_test(sampling, t_range_sample_memlevels);
+ tcase_add_test(sampling, t_range_sample_weighted);
+ suite_add_tcase(unit, sampling);
+
+ TCase *ts = tcase_create("de::DynamicExtension::tombstone_compaction Testing");
+ tcase_add_test(ts, t_tombstone_merging_01);
+ tcase_set_timeout(ts, 500);
+ suite_add_tcase(unit, ts);
+
+ TCase *flat = tcase_create("de::DynamicExtension::get_flattened_wirs_run Testing");
+ tcase_add_test(flat, t_sorted_array);
+ tcase_set_timeout(flat, 500);
+ suite_add_tcase(unit, flat);
+
+ return unit;
+}
+
+int run_unit_tests()
+{
+ int failed = 0;
+ Suite *unit = unit_testing();
+ SRunner *unit_runner = srunner_create(unit);
+
+ srunner_run_all(unit_runner, CK_NORMAL);
+ failed = srunner_ntests_failed(unit_runner);
+ srunner_free(unit_runner);
+
+ return failed;
+}
+
+
+int main()
+{
+ int unit_failed = run_unit_tests();
+
+ return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}
diff --git a/tests/wirs_tests.cpp b/tests/wirs_tests.cpp
index 7f17b9d..6fd069d 100644
--- a/tests/wirs_tests.cpp
+++ b/tests/wirs_tests.cpp
@@ -29,12 +29,12 @@ START_TEST(t_mbuffer_init)
}
BloomFilter* bf = new BloomFilter(BF_FPR, mem_table->get_tombstone_count(), BF_HASH_FUNCS, g_rng);
- Shard* run = new Shard(mem_table, bf, false);
- ck_assert_uint_eq(run->get_record_count(), 512);
+ Shard* shard = new Shard(mem_table, bf, false);
+ ck_assert_uint_eq(shard->get_record_count(), 512);
delete bf;
delete mem_table;
- delete run;
+ delete shard;
}
START_TEST(t_wirs_init)
@@ -47,35 +47,35 @@ START_TEST(t_wirs_init)
BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
BloomFilter* bf2 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
BloomFilter* bf3 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
- auto run1 = new Shard(mbuffer1, bf1, false);
- auto run2 = new Shard(mbuffer2, bf2, false);
- auto run3 = new Shard(mbuffer3, bf3, false);
+ auto shard1 = new Shard(mbuffer1, bf1, false);
+ auto shard2 = new Shard(mbuffer2, bf2, false);
+ auto shard3 = new Shard(mbuffer3, bf3, false);
BloomFilter* bf4 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
- Shard* runs[3] = {run1, run2, run3};
- auto run4 = new Shard(runs, 3, bf4, false);
+ Shard* shards[3] = {shard1, shard2, shard3};
+ auto shard4 = new Shard(shards, 3, bf4, false);
- ck_assert_int_eq(run4->get_record_count(), n * 3);
- ck_assert_int_eq(run4->get_tombstone_count(), 0);
+ ck_assert_int_eq(shard4->get_record_count(), n * 3);
+ ck_assert_int_eq(shard4->get_tombstone_count(), 0);
size_t total_cnt = 0;
- size_t run1_idx = 0;
- size_t run2_idx = 0;
- size_t run3_idx = 0;
-
- for (size_t i = 0; i < run4->get_record_count(); ++i) {
- auto rec1 = run1->get_record_at(run1_idx);
- auto rec2 = run2->get_record_at(run2_idx);
- auto rec3 = run3->get_record_at(run3_idx);
-
- auto cur_rec = run4->get_record_at(i);
-
- if (run1_idx < n && cur_rec->match(rec1)) {
- ++run1_idx;
- } else if (run2_idx < n && cur_rec->match(rec2)) {
- ++run2_idx;
- } else if (run3_idx < n && cur_rec->match(rec3)) {
- ++run3_idx;
+ size_t shard1_idx = 0;
+ size_t shard2_idx = 0;
+ size_t shard3_idx = 0;
+
+ for (size_t i = 0; i < shard4->get_record_count(); ++i) {
+ auto rec1 = shard1->get_record_at(shard1_idx);
+ auto rec2 = shard2->get_record_at(shard2_idx);
+ auto rec3 = shard3->get_record_at(shard3_idx);
+
+ auto cur_rec = shard4->get_record_at(i);
+
+ if (shard1_idx < n && cur_rec->match(rec1)) {
+ ++shard1_idx;
+ } else if (shard2_idx < n && cur_rec->match(rec2)) {
+ ++shard2_idx;
+ } else if (shard3_idx < n && cur_rec->match(rec3)) {
+ ++shard3_idx;
} else {
assert(false);
}
@@ -86,13 +86,13 @@ START_TEST(t_wirs_init)
delete mbuffer3;
delete bf1;
- delete run1;
+ delete shard1;
delete bf2;
- delete run2;
+ delete shard2;
delete bf3;
- delete run3;
+ delete shard3;
delete bf4;
- delete run4;
+ delete shard4;
}
START_TEST(t_get_lower_bound_index)
@@ -102,22 +102,22 @@ START_TEST(t_get_lower_bound_index)
ck_assert_ptr_nonnull(mbuffer);
BloomFilter* bf = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
- Shard* run = new Shard(mbuffer, bf, false);
+ Shard* shard = new Shard(mbuffer, bf, false);
- ck_assert_int_eq(run->get_record_count(), n);
- ck_assert_int_eq(run->get_tombstone_count(), 0);
+ ck_assert_int_eq(shard->get_record_count(), n);
+ ck_assert_int_eq(shard->get_tombstone_count(), 0);
auto tbl_records = mbuffer->sorted_output();
for (size_t i=0; i<n; i++) {
const WeightedRec *tbl_rec = mbuffer->get_record_at(i);
- auto pos = run->get_lower_bound(tbl_rec->key);
- ck_assert_int_eq(run->get_record_at(pos)->key, tbl_rec->key);
+ auto pos = shard->get_lower_bound(tbl_rec->key);
+ ck_assert_int_eq(shard->get_record_at(pos)->key, tbl_rec->key);
ck_assert_int_le(pos, i);
}
delete mbuffer;
delete bf;
- delete run;
+ delete shard;
}
@@ -130,17 +130,17 @@ START_TEST(t_full_cancelation)
BloomFilter* bf2 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
BloomFilter* bf3 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
- Shard* run = new Shard(buffer, bf1, false);
- Shard* run_ts = new Shard(buffer_ts, bf2, false);
+ Shard* shard = new Shard(buffer, bf1, false);
+ Shard* shard_ts = new Shard(buffer_ts, bf2, false);
- ck_assert_int_eq(run->get_record_count(), n);
- ck_assert_int_eq(run->get_tombstone_count(), 0);
- ck_assert_int_eq(run_ts->get_record_count(), n);
- ck_assert_int_eq(run_ts->get_tombstone_count(), n);
+ ck_assert_int_eq(shard->get_record_count(), n);
+ ck_assert_int_eq(shard->get_tombstone_count(), 0);
+ ck_assert_int_eq(shard_ts->get_record_count(), n);
+ ck_assert_int_eq(shard_ts->get_tombstone_count(), n);
- Shard* runs[] = {run, run_ts};
+ Shard* shards[] = {shard, shard_ts};
- Shard* merged = new Shard(runs, 2, bf3, false);
+ Shard* merged = new Shard(shards, 2, bf3, false);
ck_assert_int_eq(merged->get_tombstone_count(), 0);
ck_assert_int_eq(merged->get_record_count(), 0);
@@ -150,8 +150,8 @@ START_TEST(t_full_cancelation)
delete bf1;
delete bf2;
delete bf3;
- delete run;
- delete run_ts;
+ delete shard;
+ delete shard_ts;
delete merged;
}
END_TEST
@@ -163,7 +163,7 @@ START_TEST(t_weighted_sampling)
auto buffer = create_weighted_mbuffer(n);
BloomFilter* bf = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
- Shard* run = new Shard(buffer, bf, false);
+ Shard* shard = new Shard(buffer, bf, false);
uint64_t lower_key = 0;
uint64_t upper_key = 5;
@@ -174,9 +174,9 @@ START_TEST(t_weighted_sampling)
results.reserve(k);
size_t cnt[3] = {0};
for (size_t i=0; i<1000; i++) {
- auto state = run->get_sample_run_state(lower_key, upper_key);
+ auto state = shard->get_sample_shard_state(lower_key, upper_key);
- run->get_samples(state, results, lower_key, upper_key, k, g_rng);
+ shard->get_samples(state, results, lower_key, upper_key, k, g_rng);
for (size_t j=0; j<k; j++) {
cnt[results[j].key - 1]++;
@@ -189,7 +189,7 @@ START_TEST(t_weighted_sampling)
ck_assert(roughly_equal(cnt[1] / 1000, (double) k/4.0, k, .05));
ck_assert(roughly_equal(cnt[2] / 1000, (double) k/2.0, k, .05));
- delete run;
+ delete shard;
delete bf;
delete buffer;
}
@@ -223,14 +223,14 @@ START_TEST(t_tombstone_check)
}
BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
- auto run = new Shard(buffer, bf1, false);
+ auto shard = new Shard(buffer, bf1, false);
for (size_t i=0; i<tombstones.size(); i++) {
- ck_assert(run->check_tombstone(tombstones[i].first, tombstones[i].second));
- ck_assert_int_eq(run->get_rejection_count(), i+1);
+ ck_assert(shard->check_tombstone(tombstones[i].first, tombstones[i].second));
+ ck_assert_int_eq(shard->get_rejection_count(), i+1);
}
- delete run;
+ delete shard;
delete buffer;
delete bf1;
}
@@ -271,15 +271,15 @@ Suite *unit_testing()
}
-int run_unit_tests()
+int shard_unit_tests()
{
int failed = 0;
Suite *unit = unit_testing();
- SRunner *unit_runner = srunner_create(unit);
+ SRunner *unit_shardner = srunner_create(unit);
- srunner_run_all(unit_runner, CK_NORMAL);
- failed = srunner_ntests_failed(unit_runner);
- srunner_free(unit_runner);
+ srunner_run_all(unit_shardner, CK_NORMAL);
+ failed = srunner_ntests_failed(unit_shardner);
+ srunner_free(unit_shardner);
return failed;
}
@@ -287,7 +287,7 @@ int run_unit_tests()
int main()
{
- int unit_failed = run_unit_tests();
+ int unit_failed = shard_unit_tests();
return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
}