summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-05-17 16:00:20 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-05-17 16:00:20 -0400
commit6fd50506d2e50d2faf2478a2883a2ef1b4840a78 (patch)
treeeb29955b435a6e2e6196b9c89de15c6b8885c4f3
parent75a8418b580234521b5fa23340bee959c357acf9 (diff)
downloaddynamic-extension-6fd50506d2e50d2faf2478a2883a2ef1b4840a78.tar.gz
Started implementing shard interface (not finished yet)
-rw-r--r--include/framework/DynamicExtension.h41
-rw-r--r--include/framework/InternalLevel.h33
-rw-r--r--include/framework/ShardInterface.h27
-rw-r--r--include/shard/WIRS.h53
-rw-r--r--tests/dynamic_extension_tests.cpp21
-rw-r--r--tests/internal_level_tests.cpp10
-rw-r--r--tests/wirs_tests.cpp5
7 files changed, 106 insertions, 84 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index fd42c5f..53b55b1 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -16,6 +16,9 @@
#include "framework/MutableBuffer.h"
#include "framework/InternalLevel.h"
+#include "framework/ShardInterface.h"
+
+#include "shard/WIRS.h"
#include "ds/Alias.h"
#include "util/timer.h"
@@ -68,9 +71,8 @@ enum class DeletePolicy {
typedef ssize_t level_index;
-template <RecordInterface R>
+template <RecordInterface R, ShardInterface S>
class DynamicExtension {
- typedef WIRS<R> Shard;
typedef decltype(R::key) K;
typedef decltype(R::value) V;
typedef decltype(R::weight) W;
@@ -143,7 +145,7 @@ public:
// Get the shard weights for each level. Index 0 is the buffer,
// represented by nullptr.
- std::vector<std::pair<ShardID, Shard*>> shards;
+ std::vector<std::pair<ShardID, S*>> shards;
std::vector<void*> states;
shards.push_back({{-1, -1}, nullptr});
states.push_back(nullptr);
@@ -151,13 +153,15 @@ public:
std::vector<W> shard_weights;
shard_weights.push_back((double) buffer_weight);
+ WIRS<R>::wirs_query_parms parms = {lower_key, upper_key};
+
for (auto &level : m_levels) {
- level->get_shard_weights(shard_weights, shards, states, lower_key, upper_key);
+ level->get_query_states(shard_weights, shards, states, &parms);
}
if (shard_weights.size() == 1 && shard_weights[0] == 0) {
if (buffer_alias) delete buffer_alias;
- for (auto& x: states) Shard::delete_state(x);
+ for (auto& x: states) S::delete_query_state(x);
sampling_bailouts++;
return; // no records in the sampling range
}
@@ -243,7 +247,7 @@ public:
} while (sample_idx < sample_sz);
if (buffer_alias) delete buffer_alias;
- for (auto& x: states) Shard::delete_state(x);
+ for (auto& x: states) S::delete_query_state(x);
enforce_rejection_rate_maximum(rng);
}
@@ -348,8 +352,8 @@ public:
}
- Shard *create_ssi() {
- std::vector<Shard *> shards;
+ S *create_ssi() {
+ std::vector<S *> shards;
if (m_levels.size() > 0) {
for (int i=m_levels.size() - 1; i>= 0; i--) {
@@ -359,9 +363,9 @@ public:
}
}
- shards.emplace_back(new Shard(get_buffer(), nullptr));
+ shards.emplace_back(new S(get_buffer(), nullptr));
- Shard *shards_array[shards.size()];
+ S *shards_array[shards.size()];
size_t j = 0;
for (size_t i=0; i<shards.size(); i++) {
@@ -370,7 +374,7 @@ public:
}
}
- Shard *flattened = new Shard(shards_array, j, nullptr);
+ S *flattened = new S(shards_array, j, nullptr);
for (auto shard : shards) {
delete shard;
@@ -390,7 +394,7 @@ private:
double m_max_delete_prop;
double m_max_rejection_rate;
- std::vector<InternalLevel<R> *> m_levels;
+ std::vector<InternalLevel<R, S> *> m_levels;
level_index m_last_level_idx;
@@ -446,7 +450,7 @@ private:
if (new_idx > 0) {
assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
}
- m_levels.emplace_back(new InternalLevel<R>(new_idx, new_shard_cnt));
+ m_levels.emplace_back(new InternalLevel<R, S>(new_idx, new_shard_cnt));
m_last_level_idx++;
return new_idx;
@@ -526,14 +530,14 @@ private:
// merging two memory levels
if (LSM_LEVELING) {
auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<R>::merge_levels(m_levels[base_level], m_levels[incoming_level], rng);
+ m_levels[base_level] = InternalLevel<R, S>::merge_levels(m_levels[base_level], m_levels[incoming_level], rng);
mark_as_unused(tmp);
} else {
m_levels[base_level]->append_merged_shards(m_levels[incoming_level], rng);
}
mark_as_unused(m_levels[incoming_level]);
- m_levels[incoming_level] = new InternalLevel<R>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
+ m_levels[incoming_level] = new InternalLevel<R, S>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
}
@@ -542,9 +546,9 @@ private:
if (LSM_LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0];
- auto temp_level = new InternalLevel<R>(0, 1);
+ auto temp_level = new InternalLevel<R, S>(0, 1);
temp_level->append_mem_table(buffer, rng);
- auto new_level = InternalLevel<R>::merge_levels(old_level, temp_level, rng);
+ auto new_level = InternalLevel<R, S>::merge_levels(old_level, temp_level, rng);
m_levels[0] = new_level;
delete temp_level;
@@ -560,7 +564,7 @@ private:
* level may not be able to immediately be deleted, depending upon who
* else is using it.
*/
- inline void mark_as_unused(InternalLevel<R> *level) {
+ inline void mark_as_unused(InternalLevel<R, S> *level) {
delete level;
}
@@ -609,6 +613,7 @@ private:
* no guarantees about which buffer will be accessed if level_index is -1.
*/
inline size_t get_level_record_count(level_index idx, MutableBuffer<R> *buffer=nullptr) {
+
assert(idx >= -1);
if (idx == -1) {
return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count();
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index 19bfe9f..18b7de3 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -14,25 +14,26 @@
#include "util/types.h"
#include "util/bf_config.h"
-#include "shard/WIRS.h"
+#include "framework/ShardInterface.h"
+#include "framework/MutableBuffer.h"
#include "ds/BloomFilter.h"
namespace de {
-template <RecordInterface R>
+template <RecordInterface R, ShardInterface S>
class InternalLevel {
static const size_t REJECTION_TRIGGER_THRESHOLD = 1024;
typedef decltype(R::key) K;
typedef decltype(R::value) V;
- typedef WIRS<R> Shard;
+ //typedef WIRS<R> S;
private:
struct InternalLevelStructure {
InternalLevelStructure(size_t cap)
: m_cap(cap)
- , m_shards(new Shard*[cap]{nullptr})
+ , m_shards(new S*[cap]{nullptr})
, m_bfs(new BloomFilter*[cap]{nullptr}) {}
~InternalLevelStructure() {
@@ -46,7 +47,7 @@ private:
}
size_t m_cap;
- Shard** m_shards;
+ S** m_shards;
BloomFilter** m_bfs;
};
@@ -75,49 +76,49 @@ public:
new BloomFilter(BF_FPR,
new_level->get_tombstone_count() + base_level->get_tombstone_count(),
BF_HASH_FUNCS, rng);
- Shard* shards[2];
+ S* shards[2];
shards[0] = base_level->m_structure->m_shards[0];
shards[1] = new_level->m_structure->m_shards[0];
- res->m_structure->m_shards[0] = new Shard(shards, 2, res->m_structure->m_bfs[0]);
+ res->m_structure->m_shards[0] = new S(shards, 2, res->m_structure->m_bfs[0]);
return res;
}
void append_mem_table(MutableBuffer<R>* buffer, const gsl_rng* rng) {
assert(m_shard_cnt < m_structure->m_cap);
m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng);
- m_structure->m_shards[m_shard_cnt] = new Shard(buffer, m_structure->m_bfs[m_shard_cnt]);
+ m_structure->m_shards[m_shard_cnt] = new S(buffer, m_structure->m_bfs[m_shard_cnt]);
++m_shard_cnt;
}
void append_merged_shards(InternalLevel* level, const gsl_rng* rng) {
assert(m_shard_cnt < m_structure->m_cap);
m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng);
- m_structure->m_shards[m_shard_cnt] = new Shard(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt]);
+ m_structure->m_shards[m_shard_cnt] = new S(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt]);
++m_shard_cnt;
}
- Shard *get_merged_shard() {
- Shard *shards[m_shard_cnt];
+ S *get_merged_shard() {
+ S *shards[m_shard_cnt];
for (size_t i=0; i<m_shard_cnt; i++) {
shards[i] = (m_structure->m_shards[i]) ? m_structure->m_shards[i] : nullptr;
}
- return new Shard(shards, m_shard_cnt, nullptr);
+ return new S(shards, m_shard_cnt, nullptr);
}
// Append the sample range in-order.....
- void get_shard_weights(std::vector<uint64_t>& weights, std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, const K& low, const K& high) {
+ void get_query_states(std::vector<uint64_t>& weights, std::vector<std::pair<ShardID, S *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
for (size_t i=0; i<m_shard_cnt; i++) {
if (m_structure->m_shards[i]) {
- auto shard_state = m_structure->m_shards[i]->get_sample_shard_state(low, high);
+ auto shard_state = m_structure->m_shards[i]->get_query_state(query_parms);
if (shard_state->tot_weight > 0) {
shards.push_back({{m_level_no, (ssize_t) i}, m_structure->m_shards[i]});
weights.push_back(shard_state->tot_weight);
shard_states.emplace_back(shard_state);
} else {
- Shard::delete_state(shard_state);
+ S::delete_state(shard_state);
}
}
}
@@ -156,7 +157,7 @@ public:
return m_structure->m_shards[shard_no]->get_record_at(idx);
}
- Shard* get_shard(size_t idx) {
+ S* get_shard(size_t idx) {
return m_structure->m_shards[idx];
}
diff --git a/include/framework/ShardInterface.h b/include/framework/ShardInterface.h
new file mode 100644
index 0000000..5d07a99
--- /dev/null
+++ b/include/framework/ShardInterface.h
@@ -0,0 +1,27 @@
+/*
+ * include/shard/ShardInterface.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <concepts>
+
+#include "util/types.h"
+
+template <typename S>
+concept ShardInterface = requires(S s, void *p) {
+ //s.point_lookup();
+ //s.tombstone_lookup();
+ //s.delete_record();
+
+ {s.get_query_state(p)} -> std::convertible_to<void*>;
+ {s.delete_query_state(p)};
+
+ {s.get_record_count()} -> std::convertible_to<size_t>;
+ {s.get_tombstone_count()} -> std::convertible_to<size_t>;
+ {s.get_memory_usage()} -> std::convertible_to<size_t>;
+};
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h
index 2572caf..9f37396 100644
--- a/include/shard/WIRS.h
+++ b/include/shard/WIRS.h
@@ -25,6 +25,7 @@ namespace de {
thread_local size_t wirs_cancelations = 0;
+
template <WeightedRecordInterface R>
class WIRS {
private:
@@ -53,6 +54,11 @@ private:
};
public:
+ struct wirs_query_parms {
+ K lower_bound;
+ K upper_bound;
+ };
+
WIRS(MutableBuffer<R>* buffer, BloomFilter* bf)
: m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0),
m_ts_check_cnt(0), m_root(nullptr) {
@@ -178,14 +184,6 @@ public:
return false;
}
- void free_tree(struct wirs_node<R>* node) {
- if (node) {
- delete node->alias;
- free_tree(node->left);
- free_tree(node->right);
- delete node;
- }
- }
R* sorted_output() const {
return m_data;
@@ -206,8 +204,10 @@ public:
// low - high -> decompose to a set of nodes.
// Build Alias across the decomposed nodes.
- WIRSState<R>* get_sample_shard_state(const K& lower_key, const K& upper_key) {
+ WIRSState<R>* get_query_state(void *query_parameters) {
auto res = new WIRSState();
+ K lower_key = ((wirs_query_parms *) query_parameters)->lower_bound;
+ K upper_key = ((wirs_query_parms *) query_parameters)->upper_bound;
// Simulate a stack to unfold recursion.
double tot_weight = 0.0;
@@ -236,7 +236,7 @@ public:
return res;
}
- static void delete_state(void *state) {
+ static void delete_query_state(void *state) {
WIRSState<R> *s = (WIRSState<R> *) state;
delete s;
}
@@ -293,26 +293,6 @@ public:
return min;
}
- /*
- bool check_delete(K key, V val) {
- size_t idx = get_lower_bound(key);
- if (idx >= m_reccnt) {
- return false;
- }
-
- auto ptr = m_data + get_lower_bound(key);
-
- while (ptr < m_data + m_reccnt && *ptr < R {key, val}) {
- ptr ++;
- }
-
- bool result = (m_tagging) ? ptr->is_deleted()
- : *ptr == R {key, val} && ptr->is_tombstone();
- m_rejection_cnt += result;
- return result;
- }
- */
-
bool check_tombstone(const R& rec) {
m_ts_check_cnt++;
size_t idx = get_lower_bound(rec.key);
@@ -332,12 +312,10 @@ public:
return result;
}
-
- size_t get_memory_utilization() {
+ size_t get_memory_usage() {
return 0;
}
-
size_t get_rejection_count() {
return m_rejection_cnt;
}
@@ -418,6 +396,15 @@ private:
m_root = construct_wirs_node(weights, 0, n_groups-1);
}
+ void free_tree(struct wirs_node<R>* node) {
+ if (node) {
+ delete node->alias;
+ free_tree(node->left);
+ free_tree(node->right);
+ delete node;
+ }
+ }
+
R* m_data;
std::vector<Alias *> m_alias;
wirs_node<R>* m_root;
diff --git a/tests/dynamic_extension_tests.cpp b/tests/dynamic_extension_tests.cpp
index 3638b76..422e904 100644
--- a/tests/dynamic_extension_tests.cpp
+++ b/tests/dynamic_extension_tests.cpp
@@ -15,13 +15,14 @@
#include "testing.h"
#include "framework/DynamicExtension.h"
+#include "shard/WIRS.h"
#include <check.h>
using namespace de;
START_TEST(t_create)
{
- auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, 1, 1, g_rng);
ck_assert_ptr_nonnull(ext_wirs);
@@ -35,7 +36,7 @@ END_TEST
START_TEST(t_append)
{
- auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, 1, 1, g_rng);
uint64_t key = 0;
uint32_t val = 0;
@@ -56,7 +57,7 @@ END_TEST
START_TEST(t_append_with_mem_merges)
{
- auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, 1, 1, g_rng);
uint64_t key = 0;
uint32_t val = 0;
@@ -77,7 +78,7 @@ END_TEST
START_TEST(t_range_sample_memtable)
{
- auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, 1, 1, g_rng);
uint64_t key = 0;
uint32_t val = 0;
@@ -112,7 +113,7 @@ END_TEST
START_TEST(t_range_sample_memlevels)
{
- auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, 1, 1, g_rng);
uint64_t key = 0;
uint32_t val = 0;
@@ -146,7 +147,7 @@ END_TEST
START_TEST(t_range_sample_weighted)
{
- auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, 1, 1, g_rng);
size_t n = 10000;
std::vector<uint64_t> keys;
@@ -217,7 +218,7 @@ END_TEST
START_TEST(t_tombstone_merging_01)
{
size_t reccnt = 100000;
- auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, .01, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, .01, 1, g_rng);
std::set<std::pair<uint64_t, uint32_t>> records;
std::set<std::pair<uint64_t, uint32_t>> to_delete;
@@ -265,8 +266,8 @@ START_TEST(t_tombstone_merging_01)
}
END_TEST
-DynamicExtension<WRec> *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
- auto ext_wirs = new DynamicExtension<WRec>(1000, 1000, 2, 1, 1, g_rng);
+DynamicExtension<WRec, WIRS<WRec>> *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
+ auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(1000, 1000, 2, 1, 1, g_rng);
std::set<WRec> records;
std::set<WRec> to_delete;
@@ -309,7 +310,7 @@ DynamicExtension<WRec> *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
START_TEST(t_sorted_array)
{
size_t reccnt = 100000;
- auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec, WIRS<WRec>>(100, 100, 2, 1, 1, g_rng);
std::set<std::pair<uint64_t, uint32_t>> records;
std::set<std::pair<uint64_t, uint32_t>> to_delete;
diff --git a/tests/internal_level_tests.cpp b/tests/internal_level_tests.cpp
index 74d29f0..6803112 100644
--- a/tests/internal_level_tests.cpp
+++ b/tests/internal_level_tests.cpp
@@ -23,16 +23,16 @@ START_TEST(t_memlevel_merge)
auto tbl1 = create_test_mbuffer<WRec>(100);
auto tbl2 = create_test_mbuffer<WRec>(100);
- auto base_level = new InternalLevel<WRec>(1, 1);
+ auto base_level = new InternalLevel<WRec, WIRS<WRec>>(1, 1);
base_level->append_mem_table(tbl1, g_rng);
ck_assert_int_eq(base_level->get_record_cnt(), 100);
- auto merging_level = new InternalLevel<WRec>(0, 1);
+ auto merging_level = new InternalLevel<WRec, WIRS<WRec>>(0, 1);
merging_level->append_mem_table(tbl2, g_rng);
ck_assert_int_eq(merging_level->get_record_cnt(), 100);
auto old_level = base_level;
- base_level = InternalLevel<WRec>::merge_levels(old_level, merging_level, g_rng);
+ base_level = InternalLevel<WRec, WIRS<WRec>>::merge_levels(old_level, merging_level, g_rng);
delete old_level;
delete merging_level;
@@ -44,11 +44,11 @@ START_TEST(t_memlevel_merge)
}
-InternalLevel<WRec> *create_test_memlevel(size_t reccnt) {
+InternalLevel<WRec, WIRS<WRec>> *create_test_memlevel(size_t reccnt) {
auto tbl1 = create_test_mbuffer<WRec>(reccnt/2);
auto tbl2 = create_test_mbuffer<WRec>(reccnt/2);
- auto base_level = new InternalLevel<WRec>(1, 2);
+ auto base_level = new InternalLevel<WRec, WIRS<WRec>>(1, 2);
base_level->append_mem_table(tbl1, g_rng);
base_level->append_mem_table(tbl2, g_rng);
diff --git a/tests/wirs_tests.cpp b/tests/wirs_tests.cpp
index 828362d..c7edec9 100644
--- a/tests/wirs_tests.cpp
+++ b/tests/wirs_tests.cpp
@@ -185,7 +185,8 @@ START_TEST(t_weighted_sampling)
results.reserve(k);
size_t cnt[3] = {0};
for (size_t i=0; i<1000; i++) {
- auto state = shard->get_sample_shard_state(lower_key, upper_key);
+ WIRS<WRec>::wirs_query_parms parms = {lower_key, upper_key};
+ auto state = shard->get_query_state(&parms);
shard->get_samples(state, results, lower_key, upper_key, k, g_rng);
@@ -193,7 +194,7 @@ START_TEST(t_weighted_sampling)
cnt[results[j].key - 1]++;
}
- WIRS<WRec>::delete_state(state);
+ WIRS<WRec>::delete_query_state(state);
}
ck_assert(roughly_equal(cnt[0] / 1000, (double) k/4.0, k, .05));