summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--benchmarks/sampling_tput.cpp7
-rw-r--r--include/shard/MemISAM.h23
-rw-r--r--include/shard/WIRS.h22
-rw-r--r--include/shard/WSS.h17
4 files changed, 39 insertions, 30 deletions
diff --git a/benchmarks/sampling_tput.cpp b/benchmarks/sampling_tput.cpp
index 3d38d6a..b25b15f 100644
--- a/benchmarks/sampling_tput.cpp
+++ b/benchmarks/sampling_tput.cpp
@@ -91,6 +91,8 @@ static void sample_benchmark(ExtendedWSS *tree, size_t k, size_t trial_cnt)
WRec sample_set[k];
+ size_t total_samples = 0;
+
de::wss_query_parms<WRec> parms;
parms.rng = g_rng;
parms.sample_size = k;
@@ -100,6 +102,7 @@ static void sample_benchmark(ExtendedWSS *tree, size_t k, size_t trial_cnt)
auto start = std::chrono::high_resolution_clock::now();
for (int j=0; j < batch_size; j++) {
auto res = tree->query(&parms);
+ total_samples += res.size();
}
auto stop = std::chrono::high_resolution_clock::now();
@@ -108,7 +111,7 @@ static void sample_benchmark(ExtendedWSS *tree, size_t k, size_t trial_cnt)
progress_update(1.0, progbuf);
- size_t throughput = (((double)(trial_cnt * k) / (double) total_time) * 1e9);
+ size_t throughput = (((double)(total_samples) / (double) total_time) * 1e9);
fprintf(stdout, "%ld\n", throughput);
fflush(stdout);
@@ -147,7 +150,7 @@ int main(int argc, char **argv)
size_t insert_cnt = record_count - warmup_cnt;
insert_benchmark(&sampling_lsm, &datafile, insert_cnt, delete_prop);
-// sample_benchmark(&sampling_lsm, 1000, 10000);
+ sample_benchmark(&sampling_lsm, 1000, 10000);
delete_bench_env();
fflush(stdout);
diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h
index ae1c682..96c404e 100644
--- a/include/shard/MemISAM.h
+++ b/include/shard/MemISAM.h
@@ -361,6 +361,7 @@ public:
res->lower_bound = isam->get_lower_bound(lower_key);
res->upper_bound = isam->get_upper_bound(upper_key);
+ res->sample_size = 0;
return res;
}
@@ -369,6 +370,7 @@ public:
auto res = new IRSBufferState<R>();
res->cutoff = buffer->get_record_count();
+ res->sample_size = 0;
if constexpr (Rejection) {
return res;
@@ -390,7 +392,7 @@ public:
auto p = (irs_query_parms<R> *) query_parms;
auto bs = (IRSBufferState<R> *) buff_state;
- std::vector<size_t> shard_sample_sizes = {0};
+ std::vector<size_t> shard_sample_sizes(shard_states.size()+1, 0);
size_t buffer_sz = 0;
std::vector<size_t> weights;
@@ -400,7 +402,7 @@ public:
weights.push_back(bs->records.size());
}
- decltype(R::weight) total_weight;
+ decltype(R::weight) total_weight = 0;
for (auto &s : shard_states) {
auto state = (IRSState<R> *) s;
total_weight += state->upper_bound - state->lower_bound;
@@ -422,21 +424,20 @@ public:
}
}
-
bs->sample_size = buffer_sz;
- size_t i=1;
- for (auto &s : shard_states) {
- auto state = (IRSState<R> *) s;
- state->sample_size = shard_sample_sizes[i++];
+ for (size_t i=0; i<shard_states.size(); i++) {
+ auto state = (IRSState<R> *) shard_states[i];
+ state->sample_size = shard_sample_sizes[i+1];
}
}
+
static std::vector<Wrapped<R>> query(MemISAM<R> *isam, void *q_state, void *parms) {
- auto sample_sz = ((irs_query_parms<R> *) parms)->sample_size;
auto lower_key = ((irs_query_parms<R> *) parms)->lower_bound;
auto upper_key = ((irs_query_parms<R> *) parms)->upper_bound;
auto rng = ((irs_query_parms<R> *) parms)->rng;
auto state = (IRSState<R> *) q_state;
+ auto sample_sz = state->sample_size;
std::vector<Wrapped<R>> result_set;
@@ -460,10 +461,10 @@ public:
auto p = (irs_query_parms<R> *) parms;
std::vector<Wrapped<R>> result;
- result.reserve(p->sample_size);
+ result.reserve(st->sample_size);
if constexpr (Rejection) {
- for (size_t i=0; i<p->sample_size; i++) {
+ for (size_t i=0; i<st->sample_size; i++) {
auto idx = gsl_rng_uniform_int(p->rng, st->cutoff);
auto rec = buffer->get_data() + idx;
@@ -475,7 +476,7 @@ public:
return result;
}
- for (size_t i=0; i<p->sample_size; i++) {
+ for (size_t i=0; i<st->sample_size; i++) {
auto idx = gsl_rng_uniform_int(p->rng, st->records.size());
result.emplace_back(st->records[idx]);
}
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h
index 619c2fe..ab72129 100644
--- a/include/shard/WIRS.h
+++ b/include/shard/WIRS.h
@@ -392,6 +392,7 @@ public:
}
res->total_weight = total_weight;
res->top_level_alias = new Alias(weights);
+ res->sample_size = 0;
return res;
}
@@ -403,6 +404,7 @@ public:
state->cutoff = buffer->get_record_count() - 1;
state->max_weight = buffer->get_max_weight();
state->total_weight = buffer->get_total_weight();
+ state->sample_size = 0;
return state;
}
@@ -427,6 +429,7 @@ public:
state->total_weight = total_weight;
state->alias = new Alias(weights);
+ state->sample_size = 0;
return state;
}
@@ -435,13 +438,13 @@ public:
auto p = (wirs_query_parms<R> *) query_parms;
auto bs = (WIRSBufferState<R> *) buff_state;
- std::vector<size_t> shard_sample_sizes = {0};
+ std::vector<size_t> shard_sample_sizes(shard_states.size()+1, 0);
size_t buffer_sz = 0;
std::vector<decltype(R::weight)> weights;
weights.push_back(bs->total_weight);
- decltype(R::weight) total_weight;
+ decltype(R::weight) total_weight = 0;
for (auto &s : shard_states) {
auto state = (WIRSState<R> *) s;
total_weight += state->total_weight;
@@ -465,22 +468,21 @@ public:
bs->sample_size = buffer_sz;
- size_t i=1;
- for (auto &s : shard_states) {
- auto state = (WIRSState<R> *) s;
- state->sample_size = shard_sample_sizes[i++];
+ for (size_t i=0; i<shard_states.size(); i++) {
+ auto state = (WIRSState<R> *) shard_states[i];
+ state->sample_size = shard_sample_sizes[i+1];
}
}
static std::vector<Wrapped<R>> query(WIRS<R> *wirs, void *q_state, void *parms) {
- auto sample_size = ((wirs_query_parms<R> *) parms)->sample_size;
auto lower_key = ((wirs_query_parms<R> *) parms)->lower_bound;
auto upper_key = ((wirs_query_parms<R> *) parms)->upper_bound;
auto rng = ((wirs_query_parms<R> *) parms)->rng;
auto state = (WIRSState<R> *) q_state;
+ auto sample_size = state->sample_size;
std::vector<Wrapped<R>> result_set;
@@ -517,10 +519,10 @@ public:
auto p = (wirs_query_parms<R> *) parms;
std::vector<Wrapped<R>> result;
- result.reserve(p->sample_size);
+ result.reserve(st->sample_size);
if constexpr (Rejection) {
- for (size_t i=0; i<p->sample_size; i++) {
+ for (size_t i=0; i<st->sample_size; i++) {
auto idx = gsl_rng_uniform_int(p->rng, st->cutoff);
auto rec = buffer->get_data() + idx;
@@ -533,7 +535,7 @@ public:
return result;
}
- for (size_t i=0; i<p->sample_size; i++) {
+ for (size_t i=0; i<st->sample_size; i++) {
auto idx = st->alias->get(p->rng);
result.emplace_back(st->records[idx]);
}
diff --git a/include/shard/WSS.h b/include/shard/WSS.h
index 1069897..9300932 100644
--- a/include/shard/WSS.h
+++ b/include/shard/WSS.h
@@ -283,6 +283,8 @@ class WSSQuery {
public:
static void *get_query_state(WSS<R> *wss, void *parms) {
auto res = new WSSState<R>();
+ res->total_weight = wss->m_total_weight;
+ res->sample_size = 0;
return res;
}
@@ -293,6 +295,7 @@ public:
if constexpr (Rejection) {
state->cutoff = buffer->get_record_count() - 1;
state->max_weight = buffer->get_max_weight();
+ state->total_weight = buffer->get_total_weight();
return state;
}
@@ -312,6 +315,7 @@ public:
}
state->alias = new Alias(weights);
+ state->total_weight = total_weight;
return state;
}
@@ -320,13 +324,13 @@ public:
auto p = (wss_query_parms<R> *) query_parms;
auto bs = (WSSBufferState<R> *) buff_state;
- std::vector<size_t> shard_sample_sizes = {0};
+ std::vector<size_t> shard_sample_sizes(shard_states.size()+1, 0);
size_t buffer_sz = 0;
std::vector<decltype(R::weight)> weights;
weights.push_back(bs->total_weight);
- decltype(R::weight) total_weight;
+ decltype(R::weight) total_weight = 0;
for (auto &s : shard_states) {
auto state = (WSSState<R> *) s;
total_weight += state->total_weight;
@@ -350,18 +354,17 @@ public:
bs->sample_size = buffer_sz;
- size_t i=1;
- for (auto &s : shard_states) {
- auto state = (WSSState<R> *) s;
- state->sample_size = shard_sample_sizes[i++];
+ for (size_t i=0; i<shard_states.size(); i++) {
+ auto state = (WSSState<R> *) shard_states[i];
+ state->sample_size = shard_sample_sizes[i+1];
}
}
static std::vector<Wrapped<R>> query(WSS<R> *wss, void *q_state, void *parms) {
- auto sample_size = ((WSSState<R> *) q_state)->sample_size;
auto rng = ((wss_query_parms<R> *) parms)->rng;
auto state = (WSSState<R> *) q_state;
+ auto sample_size = state->sample_size;
std::vector<Wrapped<R>> result_set;