summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-05-22 13:25:20 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-05-22 13:25:20 -0400
commit08d6c84b9d69b500c964a8ff66e726e1f01f2095 (patch)
tree8bc156b54383de8a4a347e463901dcb7bdd1da10 /include
parent6fd50506d2e50d2faf2478a2883a2ef1b4840a78 (diff)
downloaddynamic-extension-08d6c84b9d69b500c964a8ff66e726e1f01f2095.tar.gz
Progress towards generalization of shard interface
Diffstat (limited to 'include')
-rw-r--r--include/ds/BloomFilter.h17
-rw-r--r--include/framework/DynamicExtension.h393
-rw-r--r--include/framework/InternalLevel.h162
-rw-r--r--include/framework/MutableBuffer.h115
-rw-r--r--include/framework/QueryInterface.h22
-rw-r--r--include/framework/ShardInterface.h9
-rw-r--r--include/shard/MemISAM.h64
-rw-r--r--include/shard/WIRS.h75
-rw-r--r--include/util/Record.h34
9 files changed, 292 insertions, 599 deletions
diff --git a/include/ds/BloomFilter.h b/include/ds/BloomFilter.h
index fddd1fb..d55a7af 100644
--- a/include/ds/BloomFilter.h
+++ b/include/ds/BloomFilter.h
@@ -18,25 +18,28 @@
namespace de {
+template <typename K>
class BloomFilter {
public:
- BloomFilter(size_t n_bits, size_t k, const gsl_rng* rng)
+ BloomFilter(size_t n_bits, size_t k)
: m_n_bits(n_bits), m_n_salts(k), m_bitarray(n_bits) {
+ gsl_rng *rng = gsl_rng_alloc(gsl_rng_mt19937);
salt = (uint16_t*) aligned_alloc(CACHELINE_SIZE, CACHELINEALIGN(k * sizeof(uint16_t)));
for (size_t i = 0; i < k; ++i) {
salt[i] = (uint16_t) gsl_rng_uniform_int(rng, 1 << 16);
}
-
+
+ gsl_rng_free(rng);
}
- BloomFilter(double max_fpr, size_t n, size_t k, const gsl_rng* rng)
- : BloomFilter((size_t)(-(double) (k * n) / std::log(1.0 - std::pow(max_fpr, 1.0 / k))), k, rng) {}
+ BloomFilter(double max_fpr, size_t n, size_t k)
+ : BloomFilter((size_t)(-(double) (k * n) / std::log(1.0 - std::pow(max_fpr, 1.0 / k))), k) {}
~BloomFilter() {
if (salt) free(salt);
}
- int insert(const key_t& key, size_t sz = sizeof(key_t)) {
+ int insert(const K& key, size_t sz = sizeof(K)) {
if (m_bitarray.size() == 0) return 0;
for (size_t i = 0; i < m_n_salts; ++i) {
@@ -46,7 +49,7 @@ public:
return 1;
}
- bool lookup(const key_t& key, size_t sz = sizeof(key_t)) {
+ bool lookup(const K& key, size_t sz = sizeof(K)) {
if (m_bitarray.size() == 0) return false;
for (size_t i = 0; i < m_n_salts; ++i) {
if (!m_bitarray.is_set(hash_bytes_with_salt((const char*)&key, sz, salt[i]) % m_n_bits))
@@ -60,7 +63,7 @@ public:
m_bitarray.clear();
}
- size_t get_memory_utilization() {
+ size_t get_memory_usage() {
return this->m_bitarray.mem_size();
}
private:
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 53b55b1..2009344 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -17,6 +17,7 @@
#include "framework/MutableBuffer.h"
#include "framework/InternalLevel.h"
#include "framework/ShardInterface.h"
+#include "framework/QueryInterface.h"
#include "shard/WIRS.h"
#include "ds/Alias.h"
@@ -71,221 +72,88 @@ enum class DeletePolicy {
typedef ssize_t level_index;
-template <RecordInterface R, ShardInterface S>
+template <RecordInterface R, ShardInterface S, QueryInterface Q, typename FQ=void>
class DynamicExtension {
- typedef decltype(R::key) K;
- typedef decltype(R::value) V;
- typedef decltype(R::weight) W;
public:
- DynamicExtension(size_t buffer_cap, size_t buffer_delete_cap, size_t scale_factor,
- double max_delete_prop, double max_rejection_prop, gsl_rng *rng)
- : m_active_buffer(0),
- m_scale_factor(scale_factor),
- m_max_delete_prop(max_delete_prop),
- m_max_rejection_rate(max_rejection_prop),
- m_last_level_idx(-1),
- m_buffer_1(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)),
- m_buffer_2(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)),
- m_buffer_1_merging(false), m_buffer_2_merging(false) {}
+ DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop)
+ : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop),
+ m_buffer(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_cap * max_delete_prop))
+ {}
~DynamicExtension() {
- delete m_buffer_1;
- delete m_buffer_2;
+ delete m_buffer;
for (size_t i=0; i<m_levels.size(); i++) {
delete m_levels[i];
}
}
- int delete_record(const K& key, const V& val, gsl_rng *rng) {
- assert(DELETE_TAGGING);
-
- auto buffer = get_buffer();
- // Check the levels first. This assumes there aren't
- // any undeleted duplicate records.
- for (auto level : m_levels) {
- if (level && level->delete_record(key, val)) {
- return 1;
- }
- }
-
- // the buffer will take the longest amount of time, and
- // probably has the lowest probability of having the record,
- // so we'll check it last.
- return buffer->delete_record(key, val);
+ int insert(const R &rec) {
+ return internal_append(rec, false);
}
- int append(R &rec, gsl_rng *rng) {
- // NOTE: single-threaded implementation only
+ int erase(const R &rec) {
MutableBuffer<R> *buffer;
- while (!(buffer = get_buffer()))
- ;
-
- if (buffer->is_full()) {
- merge_buffer(rng);
- }
-
- return buffer->append(rec);
- }
-
- void range_sample(R *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
- auto buffer = get_buffer();
- Alias *buffer_alias = nullptr;
- std::vector<R *> buffer_records;
- size_t buffer_cutoff = 0;
-
- W buffer_weight;
- if (LSM_REJ_SAMPLE) {
- buffer_weight = buffer->get_total_weight();
- buffer_cutoff = buffer->get_record_count() - 1;
- } else {
- buffer_weight = buffer->get_sample_range(lower_key, upper_key, buffer_records, &buffer_alias, &buffer_cutoff);
- }
-
- // Get the shard weights for each level. Index 0 is the buffer,
- // represented by nullptr.
- std::vector<std::pair<ShardID, S*>> shards;
- std::vector<void*> states;
- shards.push_back({{-1, -1}, nullptr});
- states.push_back(nullptr);
-
- std::vector<W> shard_weights;
- shard_weights.push_back((double) buffer_weight);
-
- WIRS<R>::wirs_query_parms parms = {lower_key, upper_key};
-
- for (auto &level : m_levels) {
- level->get_query_states(shard_weights, shards, states, &parms);
- }
- if (shard_weights.size() == 1 && shard_weights[0] == 0) {
- if (buffer_alias) delete buffer_alias;
- for (auto& x: states) S::delete_query_state(x);
- sampling_bailouts++;
- return; // no records in the sampling range
- }
-
- double tot_weight = std::accumulate(shard_weights.begin(), shard_weights.end(), 0);
- std::vector<double> normalized_weights(shard_weights.size());
- for (size_t i=0; i<shard_weights.size(); i++) {
- normalized_weights[i] = ((double) shard_weights[i]) / tot_weight;
- }
-
- // Construct alias structure
- auto alias = Alias(normalized_weights);
-
- std::vector<size_t> shard_samples(shard_weights.size(), 0);
-
- size_t rejections = sample_sz;
- size_t sample_idx = 0;
-
- size_t buffer_rejections = 0;
-
- do {
- for (size_t i=0; i<rejections; i++) {
- shard_samples[alias.get(rng)] += 1;
- }
-
- rejections = 0;
-
- while (shard_samples[0] > 0) {
- const R *rec;
- if (LSM_REJ_SAMPLE) {
- rec = buffer->get_sample(lower_key, upper_key, rng);
- } else {
- rec = buffer_records[buffer_alias->get(rng)];
- }
+ if constexpr (DELETE_TAGGING) {
+ auto buffer = get_buffer();
- if (DELETE_TAGGING) {
- if (rec && !rec->is_deleted()) {
- sample_set[sample_idx++] = *rec;
- } else {
- rejections++;
- }
- } else {
- if (rec && !buffer->check_tombstone(*rec)) {
- sample_set[sample_idx++] = *rec;
- } else {
- rejections++;
- }
- }
-
- shard_samples[0]--;
-
- // Assume nothing in buffer and bail out.
- // FIXME: rather than a bailout, we could switch to non-rejection
- // sampling, but that would require rebuilding the full alias structure.
- // Wouldn't be too hard to do, but for the moment I'll just do this.
- if (LSM_REJ_SAMPLE && buffer_rejections >= sample_sz && sample_idx == 0 && shard_weights.size() == 1) {
- if (buffer_alias) delete buffer_alias;
- //for (auto& x: states) delete x;
- sampling_bailouts++;
- return; // no records in the sampling range
+ // Check the levels first. This assumes there aren't
+ // any undeleted duplicate records.
+ for (auto level : m_levels) {
+ if (level && level->delete_record(rec)) {
+ return 1;
}
}
- std::vector<R> results;
- for (size_t i=1; i<shard_samples.size(); i++) {
- results.reserve(shard_samples[i]);
-
- shards[i].second->get_samples(states[i], results, lower_key, upper_key, shard_samples[i], rng);
+ // the buffer will take the longest amount of time, and
+ // probably has the lowest probability of having the record,
+ // so we'll check it last.
+ return buffer->delete_record(rec);
+ }
- for (size_t j=0; j<results.size(); j++) {
- if (rejection(results[j], shards[i].first, lower_key, upper_key, buffer, buffer_cutoff)) {
- rejections++;
- continue;
- }
+ return internal_append(rec, true);
+ }
- sample_set[sample_idx++] = results[j];
- }
+ std::vector<R> query(void *parms) {
- shard_samples[i] = 0;
- results.clear();
- }
+ // Use the provided top-level query function is one
+ // is specified. Otherwise, use the default framework
+ // behavior.
+ if constexpr (!std::is_same<FQ, void>::value) {
+ return FQ(parms);
+ }
- } while (sample_idx < sample_sz);
+ auto buffer = get_buffer();
- if (buffer_alias) delete buffer_alias;
- for (auto& x: states) S::delete_query_state(x);
+ // Get the buffer query state
+ auto buffer_state = Q::get_buffer_query_state(buffer, parms);
- enforce_rejection_rate_maximum(rng);
- }
+ // Get the shard query states
+ std::vector<std::pair<ShardID, S*>> shards;
+ std::vector<void*> states;
- // Checks the tree and buffer for a tombstone corresponding to
- // the provided record in any shard *above* the shid, which
- // should correspond to the shard containing the record in question
- //
- // Passing INVALID_SHID indicates that the record exists within the buffer
- bool is_deleted(const R &record, const ShardID &shid, MutableBuffer<R> *buffer, size_t buffer_cutoff) {
- // If tagging is enabled, we just need to check if the record has the delete tag set
- if (DELETE_TAGGING) {
- return record.is_deleted();
+ for (auto &level : m_levels) {
+ level->get_query_states(shards, states, parms);
}
- // Otherwise, we need to look for a tombstone.
-
- // check for tombstone in the buffer. This will require accounting for the cutoff eventually.
- if (buffer->check_tombstone(record)) {
- return true;
- }
+ std::vector<std::vector<R>> query_results(shards.size() + 1);
- // if the record is in the buffer, then we're done.
- if (shid == INVALID_SHID) {
- return false;
- }
+ // Execute the query for the buffer
+ query_results[0] = Q::buffer_query(buffer, buffer_state, parms);
- for (size_t lvl=0; lvl<=shid.level_idx; lvl++) {
- if (m_levels[lvl]->check_tombstone(0, record)) {
- return true;
- }
+ // Execute the query for each shard
+ for (size_t i=0; i<shards.size(); i++) {
+ query_results[i] = post_process(Q::query(shards[i].second, states[i], parms));
}
+
+ // Merge the results together
+ auto result = Q::merge(query_results);
- // check the level containing the shard
- return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record);
+ return result;
}
-
size_t get_record_cnt() {
size_t cnt = get_buffer()->get_record_count();
@@ -296,7 +164,6 @@ public:
return cnt;
}
-
size_t get_tombstone_cnt() {
size_t cnt = get_buffer()->get_tombstone_count();
@@ -311,22 +178,22 @@ public:
return m_levels.size();
}
- size_t get_memory_utilization() {
- size_t cnt = m_buffer_1->get_memory_utilization() + m_buffer_2->get_memory_utilization();
+ size_t get_memory_usage() {
+ size_t cnt = m_buffer->get_memory_usage();
for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_memory_utilization();
+ if (m_levels[i]) cnt += m_levels[i]->get_memory_usage();
}
return cnt;
}
- size_t get_aux_memory_utilization() {
- size_t cnt = m_buffer_1->get_aux_memory_utilization() + m_buffer_2->get_aux_memory_utilization();
+ size_t get_aux_memory_usage() {
+ size_t cnt = m_buffer->get_aux_memory_usage();
for (size_t i=0; i<m_levels.size(); i++) {
if (m_levels[i]) {
- cnt += m_levels[i]->get_aux_memory_utilization();
+ cnt += m_levels[i]->get_aux_memory_usage();
}
}
@@ -348,10 +215,9 @@ public:
}
size_t get_buffer_capacity() {
- return m_buffer_1->get_capacity();
+ return m_buffer->get_capacity();
}
-
S *create_ssi() {
std::vector<S *> shards;
@@ -384,57 +250,72 @@ public:
}
private:
- MutableBuffer<R> *m_buffer_1;
- MutableBuffer<R> *m_buffer_2;
- std::atomic<bool> m_active_buffer;
- std::atomic<bool> m_buffer_1_merging;
- std::atomic<bool> m_buffer_2_merging;
+ MutableBuffer<R> *m_buffer;
size_t m_scale_factor;
double m_max_delete_prop;
- double m_max_rejection_rate;
- std::vector<InternalLevel<R, S> *> m_levels;
-
- level_index m_last_level_idx;
+ std::vector<InternalLevel<R, S, Q> *> m_levels;
MutableBuffer<R> *get_buffer() {
- if (m_buffer_1_merging && m_buffer_2_merging) {
- return nullptr;
- }
-
- return (m_active_buffer) ? m_buffer_2 : m_buffer_1;
+ return m_buffer;
}
- inline bool rejection(const R &record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer<R> *buffer, size_t buffer_cutoff) {
- if (record.is_tombstone()) {
- tombstone_rejections++;
- return true;
- } else if (record.key < lower_bound || record.key > upper_bound) {
- bounds_rejections++;
- return true;
- } else if (is_deleted(record, shid, buffer, buffer_cutoff)) {
- deletion_rejections++;
- return true;
+ int internal_append(R &rec, bool ts) {
+ MutableBuffer<R> *buffer;
+ while (!(buffer = get_buffer()))
+ ;
+
+ if (buffer->is_full()) {
+ merge_buffer();
}
- return false;
+ return buffer->append(rec, ts);
}
- inline bool add_to_sample(const R &record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer,
- R *sample_buffer, size_t &sample_idx, MutableBuffer<R> *buffer, size_t buffer_cutoff) {
- TIMER_INIT();
- TIMER_START();
- sampling_attempts++;
- if (!record || rejection(record, shid, lower_key, upper_key, io_buffer, buffer, buffer_cutoff)) {
- sampling_rejections++;
- return false;
+ std::vector<R> post_process(std::vector<R> records, ShardID shid, MutableBuffer<R> *buffer) {
+ std::vector<R> processed_records;
+ processed_records.reserve(records.size());
+
+ // For delete tagging, we just need to check the delete bit on each
+ // record.
+ if constexpr (DELETE_TAGGING) {
+ for (auto &rec : records) {
+ if (rec.is_deleted()) {
+ continue;
+ }
+
+ processed_records.emplace_back(rec.rec);
+ }
+
+ return processed_records;
}
- TIMER_STOP();
- rejection_check_time += TIMER_RESULT();
- sample_buffer[sample_idx++] = *record;
- return true;
+ // For tombstone deletes, we need to search for the corresponding
+ // tombstone for each record.
+ for (auto &rec : records) {
+ if (rec.is_tombstone()) {
+ continue;
+ }
+
+ if (buffer->check_tombstone(rec.rec)) {
+ continue;
+ }
+
+ if (shid != INVALID_SHID) {
+ for (size_t lvl=0; lvl<=shid.level_idx; lvl++) {
+ if (m_levels[lvl]->check_tombstone(0, rec.rec)) {
+ continue;
+ }
+ }
+
+ if (m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) {
+ continue;
+ }
+ }
+
+ processed_records.emplace_back(rec.rec);
+ }
}
/*
@@ -450,24 +331,23 @@ private:
if (new_idx > 0) {
assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
}
- m_levels.emplace_back(new InternalLevel<R, S>(new_idx, new_shard_cnt));
+ m_levels.emplace_back(new InternalLevel<R, S, Q>(new_idx, new_shard_cnt));
- m_last_level_idx++;
return new_idx;
}
// Merge the memory table down into the tree, completing any required other
// merges to make room for it.
- inline void merge_buffer(gsl_rng *rng) {
+ inline void merge_buffer() {
auto buffer = get_buffer();
if (!can_merge_with(0, buffer->get_record_count())) {
- merge_down(0, rng);
+ merge_down(0);
}
- merge_buffer_into_l0(buffer, rng);
- enforce_delete_maximum(0, rng);
+ merge_buffer_into_l0(buffer);
+ enforce_delete_maximum(0);
buffer->truncate();
return;
@@ -479,15 +359,15 @@ private:
* routine will recursively perform any necessary merges to make room for the
* specified level.
*/
- inline void merge_down(level_index idx, gsl_rng *rng) {
+ inline void merge_down(level_index idx) {
level_index merge_base_level = find_mergable_level(idx);
if (merge_base_level == -1) {
merge_base_level = grow();
}
for (level_index i=merge_base_level; i>idx; i--) {
- merge_levels(i, i-1, rng);
- enforce_delete_maximum(i, rng);
+ merge_levels(i, i-1);
+ enforce_delete_maximum(i);
}
return;
@@ -509,7 +389,7 @@ private:
level_index merge_level_idx;
size_t incoming_rec_cnt = get_level_record_count(idx, buffer);
- for (level_index i=idx+1; i<=m_last_level_idx; i++) {
+ for (level_index i=idx+1; i<=m_levels.size(); i++) {
if (can_merge_with(i, incoming_rec_cnt)) {
return i;
}
@@ -526,35 +406,35 @@ private:
* are skipped in the merge process--otherwise the tombstone ordering
* invariant may be violated by the merge operation.
*/
- inline void merge_levels(level_index base_level, level_index incoming_level, gsl_rng *rng) {
+ inline void merge_levels(level_index base_level, level_index incoming_level) {
// merging two memory levels
if (LSM_LEVELING) {
auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<R, S>::merge_levels(m_levels[base_level], m_levels[incoming_level], rng);
+ m_levels[base_level] = InternalLevel<R, S, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
mark_as_unused(tmp);
} else {
- m_levels[base_level]->append_merged_shards(m_levels[incoming_level], rng);
+ m_levels[base_level]->append_merged_shards(m_levels[incoming_level]);
}
mark_as_unused(m_levels[incoming_level]);
- m_levels[incoming_level] = new InternalLevel<R, S>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
+ m_levels[incoming_level] = new InternalLevel<R, S, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
}
- inline void merge_buffer_into_l0(MutableBuffer<R> *buffer, gsl_rng *rng) {
+ inline void merge_buffer_into_l0(MutableBuffer<R> *buffer) {
assert(m_levels[0]);
if (LSM_LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0];
- auto temp_level = new InternalLevel<R, S>(0, 1);
- temp_level->append_mem_table(buffer, rng);
- auto new_level = InternalLevel<R, S>::merge_levels(old_level, temp_level, rng);
+ auto temp_level = new InternalLevel<R, S, Q>(0, 1);
+ temp_level->append_buffer(buffer);
+ auto new_level = InternalLevel<R, S, Q>::merge_levels(old_level, temp_level);
m_levels[0] = new_level;
delete temp_level;
mark_as_unused(old_level);
} else {
- m_levels[0]->append_mem_table(buffer, rng);
+ m_levels[0]->append_buffer(buffer);
}
}
@@ -564,7 +444,7 @@ private:
* level may not be able to immediately be deleted, depending upon who
* else is using it.
*/
- inline void mark_as_unused(InternalLevel<R, S> *level) {
+ inline void mark_as_unused(InternalLevel<R, S, Q> *level) {
delete level;
}
@@ -573,31 +453,16 @@ private:
* if the limit is exceeded, forcibly merge levels until all
* levels below idx are below the limit.
*/
- inline void enforce_delete_maximum(level_index idx, gsl_rng *rng) {
+ inline void enforce_delete_maximum(level_index idx) {
long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx);
if (ts_prop > (long double) m_max_delete_prop) {
- merge_down(idx, rng);
+ merge_down(idx);
}
return;
}
- inline void enforce_rejection_rate_maximum(gsl_rng *rng) {
- if (m_levels.size() == 0) {
- return;
- }
-
- for (size_t i=0; i<m_last_level_idx; i++) {
- if (m_levels[i]) {
- double ratio = m_levels[i]->get_rejection_rate();
- if (ratio > m_max_rejection_rate) {
- merge_down(i, rng);
- }
- }
- }
- }
-
/*
* Assume that level "0" should be larger than the buffer. The buffer
* itself is index -1, which should return simply the buffer capacity.
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index 18b7de3..70da821 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -16,85 +16,52 @@
#include "util/bf_config.h"
#include "framework/ShardInterface.h"
#include "framework/MutableBuffer.h"
+#include "framework/QueryInterface.h"
#include "ds/BloomFilter.h"
namespace de {
-template <RecordInterface R, ShardInterface S>
+template <RecordInterface R, ShardInterface S, QueryInterface Q>
class InternalLevel {
- static const size_t REJECTION_TRIGGER_THRESHOLD = 1024;
-
- typedef decltype(R::key) K;
- typedef decltype(R::value) V;
- //typedef WIRS<R> S;
-
-private:
- struct InternalLevelStructure {
- InternalLevelStructure(size_t cap)
- : m_cap(cap)
- , m_shards(new S*[cap]{nullptr})
- , m_bfs(new BloomFilter*[cap]{nullptr}) {}
-
- ~InternalLevelStructure() {
- for (size_t i = 0; i < m_cap; ++i) {
- if (m_shards[i]) delete m_shards[i];
- if (m_bfs[i]) delete m_bfs[i];
- }
-
- delete[] m_shards;
- delete[] m_bfs;
- }
-
- size_t m_cap;
- S** m_shards;
- BloomFilter** m_bfs;
- };
-
public:
InternalLevel(ssize_t level_no, size_t shard_cap)
- : m_level_no(level_no), m_shard_cnt(0)
- , m_structure(new InternalLevelStructure(shard_cap)) {}
+ : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<S>(shard_cap, nullptr))
+ {}
// Create a new memory level sharing the shards and repurposing it as previous level_no + 1
// WARNING: for leveling only.
InternalLevel(InternalLevel* level)
: m_level_no(level->m_level_no + 1), m_shard_cnt(level->m_shard_cnt)
- , m_structure(level->m_structure) {
- assert(m_structure->m_cap == 1 && m_shard_cnt == 1);
+ , m_shards(level->m_shards) {
+ assert(m_shard_cnt == 1 && m_shards.size() == 1);
}
~InternalLevel() {}
// WARNING: for leveling only.
// assuming the base level is the level new level is merging into. (base_level is larger.)
- static InternalLevel* merge_levels(InternalLevel* base_level, InternalLevel* new_level, const gsl_rng* rng) {
+ static InternalLevel* merge_levels(InternalLevel* base_level, InternalLevel* new_level) {
assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
auto res = new InternalLevel(base_level->m_level_no, 1);
res->m_shard_cnt = 1;
- res->m_structure->m_bfs[0] =
- new BloomFilter(BF_FPR,
- new_level->get_tombstone_count() + base_level->get_tombstone_count(),
- BF_HASH_FUNCS, rng);
S* shards[2];
- shards[0] = base_level->m_structure->m_shards[0];
- shards[1] = new_level->m_structure->m_shards[0];
+ shards[0] = base_level->m_shards[0];
+ shards[1] = new_level->m_shards[0];
- res->m_structure->m_shards[0] = new S(shards, 2, res->m_structure->m_bfs[0]);
+ res->m_shards[0] = new S(shards, 2);
return res;
}
- void append_mem_table(MutableBuffer<R>* buffer, const gsl_rng* rng) {
- assert(m_shard_cnt < m_structure->m_cap);
- m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng);
- m_structure->m_shards[m_shard_cnt] = new S(buffer, m_structure->m_bfs[m_shard_cnt]);
+ void append_buffer(MutableBuffer<R>* buffer) {
+ assert(m_shard_cnt < m_shards.size());
+ m_shards[m_shard_cnt] = new S(buffer);
++m_shard_cnt;
}
- void append_merged_shards(InternalLevel* level, const gsl_rng* rng) {
- assert(m_shard_cnt < m_structure->m_cap);
- m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng);
- m_structure->m_shards[m_shard_cnt] = new S(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt]);
+ void append_merged_shards(InternalLevel* level) {
+ assert(m_shard_cnt < m_shards.size());
+ m_shards[m_shard_cnt] = new S(level->m_shards, level->m_shard_cnt);
++m_shard_cnt;
}
@@ -102,50 +69,36 @@ public:
S *shards[m_shard_cnt];
for (size_t i=0; i<m_shard_cnt; i++) {
- shards[i] = (m_structure->m_shards[i]) ? m_structure->m_shards[i] : nullptr;
+ shards[i] = m_shards[i];
}
- return new S(shards, m_shard_cnt, nullptr);
+ return new S(shards, m_shard_cnt);
}
// Append the sample range in-order.....
- void get_query_states(std::vector<uint64_t>& weights, std::vector<std::pair<ShardID, S *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
+ void get_query_states(std::vector<std::pair<ShardID, S *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_structure->m_shards[i]) {
- auto shard_state = m_structure->m_shards[i]->get_query_state(query_parms);
- if (shard_state->tot_weight > 0) {
- shards.push_back({{m_level_no, (ssize_t) i}, m_structure->m_shards[i]});
- weights.push_back(shard_state->tot_weight);
- shard_states.emplace_back(shard_state);
- } else {
- S::delete_state(shard_state);
- }
+ if (m_shards[i]) {
+ auto shard_state = Q::get_query_state(m_shards[i], query_parms);
+ shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i]});
+ shard_states.emplace_back(shard_state);
}
}
}
- bool bf_rejection_check(size_t shard_stop, const K& key) {
- for (size_t i = 0; i < shard_stop; ++i) {
- if (m_structure->m_bfs[i] && m_structure->m_bfs[i]->lookup(key))
- return true;
- }
- return false;
- }
-
bool check_tombstone(size_t shard_stop, const R& rec) {
if (m_shard_cnt == 0) return false;
for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) {
- if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(rec.key))
- && m_structure->m_shards[i]->check_tombstone(rec))
+ if (m_shards[i] && m_shards[i]->check_tombstone(rec))
return true;
}
return false;
}
- bool delete_record(const K& key, const V& val) {
- for (size_t i = 0; i < m_structure->m_cap; ++i) {
- if (m_structure->m_shards[i] && m_structure->m_shards[i]->delete_record(key, val)) {
+ bool delete_record(const R &rec) {
+ for (size_t i = 0; i < m_shards.size(); ++i) {
+ if (m_shards[i] && m_shards[i]->delete_record(rec)) {
return true;
}
}
@@ -153,12 +106,8 @@ public:
return false;
}
- const R* get_record_at(size_t shard_no, size_t idx) {
- return m_structure->m_shards[shard_no]->get_record_at(idx);
- }
-
S* get_shard(size_t idx) {
- return m_structure->m_shards[idx];
+ return m_shards[idx];
}
size_t get_shard_count() {
@@ -168,7 +117,7 @@ public:
size_t get_record_cnt() {
size_t cnt = 0;
for (size_t i=0; i<m_shard_cnt; i++) {
- cnt += m_structure->m_shards[i]->get_record_count();
+ cnt += m_shards[i]->get_record_count();
}
return cnt;
@@ -177,27 +126,25 @@ public:
size_t get_tombstone_count() {
size_t res = 0;
for (size_t i = 0; i < m_shard_cnt; ++i) {
- res += m_structure->m_shards[i]->get_tombstone_count();
+ res += m_shards[i]->get_tombstone_count();
}
return res;
}
- size_t get_aux_memory_utilization() {
+ size_t get_aux_memory_usage() {
size_t cnt = 0;
for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_structure->m_bfs[i]) {
- cnt += m_structure->m_bfs[i]->get_memory_utilization();
- }
+ cnt += m_shards[i]->get_aux_memory_usage();
}
return cnt;
}
- size_t get_memory_utilization() {
+ size_t get_memory_usage() {
size_t cnt = 0;
for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_structure->m_shards[i]) {
- cnt += m_structure->m_shards[i]->get_memory_utilization();
+ if (m_shards[i]) {
+ cnt += m_shards[i]->get_memory_usage();
}
}
@@ -208,51 +155,22 @@ public:
size_t tscnt = 0;
size_t reccnt = 0;
for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_structure->m_shards[i]) {
- tscnt += m_structure->m_shards[i]->get_tombstone_count();
- reccnt += m_structure->m_shards[i]->get_record_count();
+ if (m_shards[i]) {
+ tscnt += m_shards[i]->get_tombstone_count();
+ reccnt += m_shards[i]->get_record_count();
}
}
return (double) tscnt / (double) (tscnt + reccnt);
}
- size_t get_rejection_count() {
- size_t rej_cnt = 0;
- for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_structure->m_shards[i]) {
- rej_cnt += m_structure->m_shards[i]->get_rejection_count();
- }
- }
-
- return rej_cnt;
- }
-
- double get_rejection_rate() {
- size_t rej_cnt = 0;
- size_t attempt_cnt = 0;
- for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_structure->m_shards[i]) {
- attempt_cnt += m_structure->m_shards[i]->get_ts_check_count();
- rej_cnt += m_structure->m_shards[i]->get_rejection_count();
- }
- }
-
- if (attempt_cnt == 0) return 0;
-
- // the rejection rate is considered 0 until we exceed an
- // absolute threshold of rejections.
- if (rej_cnt <= REJECTION_TRIGGER_THRESHOLD) return 0;
-
- return (double) rej_cnt / (double) attempt_cnt;
- }
-
private:
ssize_t m_level_no;
size_t m_shard_cnt;
size_t m_shard_size_cap;
- std::shared_ptr<InternalLevelStructure> m_structure;
+
+ std::shared_ptr<std::vector<S*>> m_shards;
};
}
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h
index 74838b8..4b45f20 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/MutableBuffer.h
@@ -28,17 +28,17 @@ namespace de {
template <RecordInterface R>
class MutableBuffer {
+ typedef WrappedRecord<R> WRec;
public:
- MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap, const gsl_rng* rng)
+ MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap)
: m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
, m_tombstonecnt(0), m_weight(0), m_max_weight(0) {
- auto len = capacity * sizeof(R);
+ auto len = capacity * sizeof(WRec);
size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE));
- m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
+ m_data = (WRec*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
m_tombstone_filter = nullptr;
if (max_tombstone_cap > 0) {
- assert(rng != nullptr);
- m_tombstone_filter = new BloomFilter(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS, rng);
+ m_tombstone_filter = new BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS);
}
}
@@ -48,18 +48,23 @@ public:
}
template <typename R_ = R>
- int append(const R &rec) {
- if (rec.is_tombstone() && m_tombstonecnt + 1 > m_tombstone_cap) return 0;
+ int append(const R &rec, bool tombstone=false) {
+ if (tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0;
int32_t pos = 0;
if ((pos = try_advance_tail()) == -1) return 0;
- m_data[pos] = rec;
+ WRec wrec;
+ wrec.rec = rec;
+
+ if (tombstone) wrec.set_tombstone();
+
+ m_data[pos] = wrec;
m_data[pos].header |= (pos << 2);
- if (rec.is_tombstone()) {
+ if (tombstone) {
m_tombstonecnt.fetch_add(1);
- if (m_tombstone_filter) m_tombstone_filter->insert(rec.key);
+ if (m_tombstone_filter) m_tombstone_filter->insert(rec);
}
if constexpr (WeightedRecordInterface<R_>) {
@@ -86,18 +91,6 @@ public:
return true;
}
- R* sorted_output() {
- TIMER_INIT();
- TIMER_START();
- std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp<R>);
- TIMER_STOP();
-
- #ifdef INSTRUMENT_MERGING
- fprintf(stderr, "sort\t%ld\n", TIMER_RESULT());
- #endif
- return m_data;
- }
-
size_t get_record_count() {
return m_reccnt;
}
@@ -117,7 +110,7 @@ public:
bool delete_record(const R& rec) {
auto offset = 0;
while (offset < m_reccnt.load()) {
- if (m_data[offset] == rec) {
+ if (m_data[offset].rec == rec) {
m_data[offset].set_delete();
return true;
}
@@ -128,11 +121,11 @@ public:
}
bool check_tombstone(const R& rec) {
- if (m_tombstone_filter && !m_tombstone_filter->lookup(rec.key)) return false;
+ if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false;
auto offset = 0;
while (offset < m_reccnt.load()) {
- if (m_data[offset] == rec && m_data[offset].is_tombstone()) {
+ if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) {
return true;
}
offset++;;
@@ -140,73 +133,12 @@ public:
return false;
}
- const R* get_record_at(size_t idx) {
- return m_data + idx;
- }
-
- size_t get_memory_utilization() {
+ size_t get_memory_usage() {
return m_cap * sizeof(R);
}
- size_t get_aux_memory_utilization() {
- return m_tombstone_filter->get_memory_utilization();
- }
- //
- // NOTE: This operation samples from records strictly between the upper and
- // lower bounds, not including them
- template <typename R_ = R>
- double get_sample_range(const decltype(R_::key) lower, const decltype(R_::key)& upper,
- std::vector<R *> &records, Alias **alias, size_t *cutoff) {
- std::vector<double> weights;
-
- *cutoff = std::atomic_load(&m_reccnt) - 1;
- records.clear();
- double tot_weight = 0.0;
- for (size_t i = 0; i < (*cutoff) + 1; i++) {
- R *rec = m_data + i;
-
- if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->is_deleted()) {
- weights.push_back(rec->weight);
- records.push_back(rec);
- tot_weight += rec->weight;
- }
- }
-
- for (size_t i = 0; i < weights.size(); i++) {
- weights[i] = weights[i] / tot_weight;
- }
-
- *alias = new Alias(weights);
-
- return tot_weight;
- }
-
- // rejection sampling
- template <typename R_ = R>
- const R *get_sample(const decltype(R_::key)& lower, const decltype(R_::key)& upper, gsl_rng *rng) {
- size_t reccnt = m_reccnt.load();
- if (reccnt == 0) {
- return nullptr;
- }
-
- auto idx = (reccnt == 1) ? 0 : gsl_rng_uniform_int(rng, reccnt - 1);
- auto rec = get_record_at(idx);
-
- auto test = gsl_rng_uniform(rng) * m_max_weight.load();
-
- if (test > rec->weight) {
- return nullptr;
- }
-
- if (test <= rec->weight &&
- rec->key >= lower &&
- rec->key <= upper &&
- !rec->is_tombstone() && !rec->is_deleted()) {
-
- return rec;
- }
-
- return nullptr;
+ size_t get_aux_memory_usage() {
+ return m_tombstone_filter->get_memory_usage();
}
size_t get_tombstone_capacity() {
@@ -226,15 +158,12 @@ private:
}
size_t m_cap;
- //size_t m_buffersize;
size_t m_tombstone_cap;
- //char* m_data;
R* m_data;
- BloomFilter* m_tombstone_filter;
+ BloomFilter<R>* m_tombstone_filter;
alignas(64) std::atomic<size_t> m_tombstonecnt;
- //alignas(64) std::atomic<uint32_t> m_current_tail;
alignas(64) std::atomic<uint32_t> m_reccnt;
alignas(64) std::atomic<double> m_weight;
alignas(64) std::atomic<double> m_max_weight;
diff --git a/include/framework/QueryInterface.h b/include/framework/QueryInterface.h
new file mode 100644
index 0000000..eafeeb0
--- /dev/null
+++ b/include/framework/QueryInterface.h
@@ -0,0 +1,22 @@
+/*
+ * include/framework/QueryInterface.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <concepts>
+#include "util/types.h"
+
+template <typename Q>
+concept QueryInterface = requires(Q q, void *p) {
+ {q.get_query_state(p, p)} -> std::convertible_to<void*>;
+ {q.get_buffer_query_state(p, p)};
+ {q.query(p, p)};
+ {q.buffer_query(p, p)};
+ {q.merge()};
+ {q.delete_query_state(p)};
+};
diff --git a/include/framework/ShardInterface.h b/include/framework/ShardInterface.h
index 5d07a99..1f48a45 100644
--- a/include/framework/ShardInterface.h
+++ b/include/framework/ShardInterface.h
@@ -1,5 +1,5 @@
/*
- * include/shard/ShardInterface.h
+ * include/framework/ShardInterface.h
*
* Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
*
@@ -14,12 +14,7 @@
template <typename S>
concept ShardInterface = requires(S s, void *p) {
- //s.point_lookup();
- //s.tombstone_lookup();
- //s.delete_record();
-
- {s.get_query_state(p)} -> std::convertible_to<void*>;
- {s.delete_query_state(p)};
+ s.point_lookup();
{s.get_record_count()} -> std::convertible_to<size_t>;
{s.get_tombstone_count()} -> std::convertible_to<size_t>;
diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h
index d1f3bb3..55699be 100644
--- a/include/shard/MemISAM.h
+++ b/include/shard/MemISAM.h
@@ -15,6 +15,7 @@
#include <memory>
#include "framework/MutableBuffer.h"
+#include "util/bf_config.h"
#include "ds/PriorityQueue.h"
#include "util/Cursor.h"
#include "util/timer.h"
@@ -43,37 +44,11 @@ constexpr static size_t inmem_isam_node_keyskip = sizeof(K) * inmem_isam_fanout;
static_assert(sizeof(InMemISAMNode) == inmem_isam_node_size, "node size does not match");
public:
- MemISAM(std::string data_fname, size_t record_cnt, size_t tombstone_cnt, BloomFilter *bf)
- : m_reccnt(record_cnt), m_tombstone_cnt(tombstone_cnt), m_deleted_cnt(0) {
-
- // read the stored data file the file
- size_t alloc_size = (record_cnt * sizeof(R)) + (CACHELINE_SIZE - (record_cnt * sizeof(R)) % CACHELINE_SIZE);
- assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, alloc_size);
-
- FILE *file = fopen(data_fname.c_str(), "rb");
- assert(file);
- auto res = fread(m_data, sizeof(R), m_reccnt, file);
- assert (res == m_reccnt);
- fclose(file);
-
- // We can't really persist the internal structure, as it uses
- // pointers, which are invalidated by the move. So we'll just
- // rebuild it.
- this->build_internal_levels();
-
- // rebuild the bloom filter
- for (size_t i=0; i<m_reccnt; i++) {
- auto rec = this->get_record_at(i);
- if (rec->is_tombstone()) {
- bf->insert(rec->key);
- }
- }
- }
-
- MemISAM(MutableBuffer<R>* buffer, BloomFilter* bf)
+ MemISAM(MutableBuffer<R>* buffer)
:m_reccnt(0), m_tombstone_cnt(0), m_isam_nodes(nullptr), m_deleted_cnt(0) {
+ m_bf = new BloomFilter<K>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS);
+
size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
@@ -104,9 +79,9 @@ public:
//Masking off the ts.
base->header &= 1;
m_data[m_reccnt++] = *base;
- if (bf && base->is_tombstone()) {
+ if (m_bf && base->is_tombstone()) {
++m_tombstone_cnt;
- bf->insert(base->key);
+ m_bf->insert(base->key);
}
base++;
@@ -124,26 +99,30 @@ public:
//fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time);
}
- MemISAM(MemISAM** runs, size_t len, BloomFilter* bf)
- :m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_isam_nodes(nullptr) {
+ MemISAM(MemISAM** runs, size_t len)
+ : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_isam_nodes(nullptr) {
std::vector<Cursor<R>> cursors;
cursors.reserve(len);
PriorityQueue<R> pq(len);
size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
for (size_t i = 0; i < len; ++i) {
if (runs[i]) {
auto base = runs[i]->sorted_output();
cursors.emplace_back(Cursor{base, base + runs[i]->get_record_count(), 0, runs[i]->get_record_count()});
attemp_reccnt += runs[i]->get_record_count();
+ tombstone_count += runs[i]->get_tombstone_count();
pq.push(cursors[i].ptr, i);
} else {
cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0});
}
}
+ m_bf = new BloomFilter<K>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
+
size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
@@ -167,7 +146,7 @@ public:
m_data[m_reccnt++] = *cursor.ptr;
if (cursor.ptr->is_tombstone()) {
++m_tombstone_cnt;
- bf->insert(cursor.ptr->key);
+ m_bf->insert(cursor.ptr->key);
}
}
pq.pop();
@@ -184,6 +163,7 @@ public:
~MemISAM() {
if (m_data) free(m_data);
if (m_isam_nodes) free(m_isam_nodes);
+ if (m_bf) delete m_bf;
}
R* sorted_output() const {
@@ -259,7 +239,11 @@ public:
return pos - m_data;
}
- bool check_tombstone(const K& key, const V& val) const {
+ bool check_tombstone(const R& rec) const{
+ if (!m_bf->lookup(rec.key)) {
+ return false;
+ }
+
size_t idx = get_lower_bound(key);
if (idx >= m_reccnt) {
return false;
@@ -271,17 +255,10 @@ public:
return *ptr == R {key, val} && ptr->is_tombstone();
}
- size_t get_memory_utilization() {
+ size_t get_memory_usage() {
return m_reccnt * sizeof(R) + m_internal_node_cnt * inmem_isam_node_size;
}
- void persist_to_file(std::string data_fname) {
- FILE *file = fopen(data_fname.c_str(), "wb");
- assert(file);
- fwrite(m_data, sizeof(R), m_reccnt, file);
- fclose(file);
- }
-
private:
void build_internal_levels() {
size_t n_leaf_nodes = m_reccnt / inmem_isam_leaf_fanout + (m_reccnt % inmem_isam_leaf_fanout != 0);
@@ -349,6 +326,7 @@ private:
// Members: sorted data, internal ISAM levels, reccnt;
R* m_data;
+ BloomFilter<K> *m_bf;
InMemISAMNode* m_isam_nodes;
InMemISAMNode* m_root;
size_t m_reccnt;
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h
index 9f37396..42dbcfd 100644
--- a/include/shard/WIRS.h
+++ b/include/shard/WIRS.h
@@ -18,6 +18,7 @@
#include "util/Cursor.h"
#include "ds/Alias.h"
#include "ds/BloomFilter.h"
+#include "util/bf_config.h"
#include "util/Record.h"
#include "framework/MutableBuffer.h"
@@ -25,6 +26,11 @@ namespace de {
thread_local size_t wirs_cancelations = 0;
+template <WeightedRecordInterface R>
+struct wirs_query_parms {
+ decltype(R::key) lower_bound;
+ decltype(R::key) upper_bound;
+};
template <WeightedRecordInterface R>
class WIRS {
@@ -54,12 +60,8 @@ private:
};
public:
- struct wirs_query_parms {
- K lower_bound;
- K upper_bound;
- };
- WIRS(MutableBuffer<R>* buffer, BloomFilter* bf)
+ WIRS(MutableBuffer<R>* buffer)
: m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0),
m_ts_check_cnt(0), m_root(nullptr) {
@@ -67,6 +69,8 @@ public:
assert(alloc_size % CACHELINE_SIZE == 0);
m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_bf = new BloomFilter<K>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS);
+
size_t offset = 0;
m_reccnt = 0;
auto base = buffer->sorted_output();
@@ -88,9 +92,9 @@ public:
m_data[m_reccnt++] = *base;
m_total_weight+= base->weight;
- if (bf && base->is_tombstone()) {
+ if (m_bf && base->is_tombstone()) {
m_tombstone_cnt++;
- bf->insert(base->key);
+ m_bf->insert(base->key);
}
base++;
@@ -101,7 +105,7 @@ public:
}
}
- WIRS(WIRS** shards, size_t len, BloomFilter* bf)
+ WIRS(WIRS** shards, size_t len)
: m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0),
m_root(nullptr) {
std::vector<Cursor<R>> cursors;
@@ -110,18 +114,22 @@ public:
PriorityQueue<R> pq(len);
size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
for (size_t i = 0; i < len; ++i) {
if (shards[i]) {
auto base = shards[i]->sorted_output();
cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
attemp_reccnt += shards[i]->get_record_count();
+ tombstone_count += shards[i]->get_tombstone_count();
pq.push(cursors[i].ptr, i);
} else {
cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0});
}
}
+ m_bf = new BloomFilter<K>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
+
size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
@@ -142,9 +150,9 @@ public:
if (!cursor.ptr->is_deleted()) {
m_data[m_reccnt++] = *cursor.ptr;
m_total_weight += cursor.ptr->weight;
- if (bf && cursor.ptr->is_tombstone()) {
+ if (m_bf && cursor.ptr->is_tombstone()) {
++m_tombstone_cnt;
- if (bf) bf->insert(cursor.ptr->key);
+ if (m_bf) m_bf->insert(cursor.ptr->key);
}
}
pq.pop();
@@ -164,6 +172,8 @@ public:
if (m_alias[i]) delete m_alias[i];
}
+ if (m_bf) delete m_bf;
+
free_tree(m_root);
}
@@ -202,12 +212,13 @@ public:
return m_data + idx;
}
+ /*
// low - high -> decompose to a set of nodes.
// Build Alias across the decomposed nodes.
WIRSState<R>* get_query_state(void *query_parameters) {
auto res = new WIRSState();
- K lower_key = ((wirs_query_parms *) query_parameters)->lower_bound;
- K upper_key = ((wirs_query_parms *) query_parameters)->upper_bound;
+ K lower_key = ((wirs_query_parms<R> *) query_parameters)->lower_bound;
+ K upper_key = ((wirs_query_parms<R> *) query_parameters)->upper_bound;
// Simulate a stack to unfold recursion.
double tot_weight = 0.0;
@@ -240,40 +251,7 @@ public:
WIRSState<R> *s = (WIRSState<R> *) state;
delete s;
}
-
- // returns the number of records sampled
- // NOTE: This operation returns records strictly between the lower and upper bounds, not
- // including them.
- size_t get_samples(void* shard_state, std::vector<R> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
- WIRSState<R> *state = (WIRSState<R> *) shard_state;
- if (sample_sz == 0) {
- return 0;
- }
-
- // k -> sampling: three levels. 1. select a node -> select a fat point -> select a record.
- size_t cnt = 0;
- size_t attempts = 0;
- do {
- ++attempts;
- // first level....
- auto node = state->nodes[state->top_level_alias->get(rng)];
- // second level...
- auto fat_point = node->low + node->alias->get(rng);
- // third level...
- size_t rec_offset = fat_point * m_group_size + m_alias[fat_point]->get(rng);
- auto record = m_data + rec_offset;
-
- // bounds rejection
- if (lower_key > record->key || upper_key < record->key) {
- continue;
- }
-
- result_set.emplace_back(*record);
- cnt++;
- } while (attempts < sample_sz);
-
- return cnt;
- }
+ */
size_t get_lower_bound(const K& key) const {
size_t min = 0;
@@ -294,6 +272,10 @@ public:
}
bool check_tombstone(const R& rec) {
+ if(!m_bf.lookup(rec.key)) {
+ return false;
+ }
+
m_ts_check_cnt++;
size_t idx = get_lower_bound(rec.key);
if (idx >= m_reccnt) {
@@ -414,6 +396,7 @@ private:
size_t m_group_size;
size_t m_ts_check_cnt;
size_t m_deleted_cnt;
+ BloomFilter<K> m_bf;
// The number of rejections caused by tombstones
// in this WIRS.
diff --git a/include/util/Record.h b/include/util/Record.h
index ce101f4..fc543ed 100644
--- a/include/util/Record.h
+++ b/include/util/Record.h
@@ -20,27 +20,15 @@ template<typename R>
concept RecordInterface = requires(R r, R s) {
r.key;
r.value;
- r.header;
- {r.is_tombstone()} -> std::convertible_to<bool>;
- {r.is_deleted()} -> std::convertible_to<bool>;
- r.set_delete();
- r.set_tombstone(std::declval<bool>);
{ r < s } ->std::convertible_to<bool>;
{ r == s } ->std::convertible_to<bool>;
- { r.header < s.header } -> std::convertible_to<bool>;
};
-template <typename R>
-concept WeightedRecordInterface = RecordInterface<R> && requires(R r) {
- {r.weight} -> std::convertible_to<double>;
-};
-
-template <typename K, typename V>
-struct Record {
- K key;
- V value;
- uint32_t header = 0;
+template<RecordInterface R>
+struct WrappedRecord {
+ R rec;
+ uint32_t header;
inline void set_delete() {
header |= 2;
@@ -61,8 +49,20 @@ struct Record {
inline bool is_tombstone() const {
return header & 1;
}
+};
+
+template <typename R>
+concept WeightedRecordInterface = RecordInterface<R> && requires(R r) {
+ {r.weight} -> std::convertible_to<double>;
+};
+
+template <typename K, typename V>
+struct Record {
+ K key;
+ V value;
+ uint32_t header = 0;
- inline bool operator<(const Record& other) const {
+ inline bool operator<(const Record& other) const {
return key < other.key || (key == other.key && value < other.value);
}