summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h79
1 files changed, 40 insertions, 39 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index c65290a..1d9ee76 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -68,11 +68,12 @@ enum class DeletePolicy {
typedef ssize_t level_index;
-template <typename K, typename V, typename W=void>
+template <RecordInterface R>
class DynamicExtension {
- typedef WIRS<K,V,W> Shard;
- typedef MutableBuffer<K,V,W> MBuffer;
- typedef Record<K, V, W> Rec;
+ typedef WIRS<R> Shard;
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+ typedef decltype(R::weight) W;
public:
DynamicExtension(size_t buffer_cap, size_t buffer_delete_cap, size_t scale_factor,
@@ -82,8 +83,8 @@ public:
m_max_delete_prop(max_delete_prop),
m_max_rejection_rate(max_rejection_prop),
m_last_level_idx(-1),
- m_buffer_1(new MutableBuffer<K,V,W>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)),
- m_buffer_2(new MutableBuffer<K,V,W>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)),
+ m_buffer_1(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)),
+ m_buffer_2(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)),
m_buffer_1_merging(false), m_buffer_2_merging(false) {}
~DynamicExtension() {
@@ -113,9 +114,9 @@ public:
return buffer->delete_record(key, val);
}
- int append(const K& key, const V& val, W weight, bool tombstone, gsl_rng *rng) {
+ int append(R &rec, gsl_rng *rng) {
// NOTE: single-threaded implementation only
- MutableBuffer<K,V,W> *buffer;
+ MutableBuffer<R> *buffer;
while (!(buffer = get_buffer()))
;
@@ -123,13 +124,13 @@ public:
merge_buffer(rng);
}
- return buffer->append(key, val, weight, tombstone);
+ return buffer->append(rec);
}
- void range_sample(Record<K,V,W> *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
+ void range_sample(R *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
auto buffer = get_buffer();
Alias *buffer_alias = nullptr;
- std::vector<Record<K,V,W> *> buffer_records;
+ std::vector<R *> buffer_records;
size_t buffer_cutoff = 0;
W buffer_weight;
@@ -185,7 +186,7 @@ public:
rejections = 0;
while (shard_samples[0] > 0) {
- const Record<K,V,W> *rec;
+ const R *rec;
if (LSM_REJ_SAMPLE) {
rec = buffer->get_sample(lower_key, upper_key, rng);
} else {
@@ -193,13 +194,13 @@ public:
}
if (DELETE_TAGGING) {
- if (rec && !rec->get_delete_status()) {
+ if (rec && !rec->is_deleted()) {
sample_set[sample_idx++] = *rec;
} else {
rejections++;
}
} else {
- if (rec && !buffer->check_tombstone(rec->key, rec->value)) {
+ if (rec && !buffer->check_tombstone(*rec)) {
sample_set[sample_idx++] = *rec;
} else {
rejections++;
@@ -220,14 +221,14 @@ public:
}
}
- std::vector<Rec> results;
+ std::vector<R> results;
for (size_t i=1; i<shard_samples.size(); i++) {
results.reserve(shard_samples[i]);
shards[i].second->get_samples(states[i], results, lower_key, upper_key, shard_samples[i], rng);
for (size_t j=0; j<results.size(); j++) {
- if (rejection(&results[j], shards[i].first, lower_key, upper_key, buffer, buffer_cutoff)) {
+ if (rejection(results[j], shards[i].first, lower_key, upper_key, buffer, buffer_cutoff)) {
rejections++;
continue;
}
@@ -252,16 +253,16 @@ public:
// should correspond to the shard containing the record in question
//
// Passing INVALID_SHID indicates that the record exists within the buffer
- bool is_deleted(const Record<K,V,W> *record, const ShardID &shid, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) {
+ bool is_deleted(const R &record, const ShardID &shid, MutableBuffer<R> *buffer, size_t buffer_cutoff) {
// If tagging is enabled, we just need to check if the record has the delete tag set
if (DELETE_TAGGING) {
- return record->get_delete_status();
+ return record.is_deleted();
}
// Otherwise, we need to look for a tombstone.
// check for tombstone in the buffer. This will require accounting for the cutoff eventually.
- if (buffer->check_tombstone(record->key, record->value)) {
+ if (buffer->check_tombstone(record)) {
return true;
}
@@ -271,13 +272,13 @@ public:
}
for (size_t lvl=0; lvl<=shid.level_idx; lvl++) {
- if (m_levels[lvl]->check_tombstone(0, record->key, record->value)) {
+ if (m_levels[lvl]->check_tombstone(0, record)) {
return true;
}
}
// check the level containing the shard
- return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record->key, record->value);
+ return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record);
}
@@ -379,8 +380,8 @@ public:
}
private:
- MutableBuffer<K,V,W> *m_buffer_1;
- MutableBuffer<K,V,W> *m_buffer_2;
+ MutableBuffer<R> *m_buffer_1;
+ MutableBuffer<R> *m_buffer_2;
std::atomic<bool> m_active_buffer;
std::atomic<bool> m_buffer_1_merging;
std::atomic<bool> m_buffer_2_merging;
@@ -389,11 +390,11 @@ private:
double m_max_delete_prop;
double m_max_rejection_rate;
- std::vector<InternalLevel<K,V,W> *> m_levels;
+ std::vector<InternalLevel<R> *> m_levels;
level_index m_last_level_idx;
- MutableBuffer<K,V,W> *get_buffer() {
+ MutableBuffer<R> *get_buffer() {
if (m_buffer_1_merging && m_buffer_2_merging) {
return nullptr;
}
@@ -401,11 +402,11 @@ private:
return (m_active_buffer) ? m_buffer_2 : m_buffer_1;
}
- inline bool rejection(const Record<K,V,W> *record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) {
- if (record->is_tombstone()) {
+ inline bool rejection(const R &record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer<R> *buffer, size_t buffer_cutoff) {
+ if (record.is_tombstone()) {
tombstone_rejections++;
return true;
- } else if (record->key < lower_bound || record->key > upper_bound) {
+ } else if (record.key < lower_bound || record.key > upper_bound) {
bounds_rejections++;
return true;
} else if (is_deleted(record, shid, buffer, buffer_cutoff)) {
@@ -416,8 +417,8 @@ private:
return false;
}
- inline bool add_to_sample(const Record<K,V,W> *record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer,
- Record<K,V,W> *sample_buffer, size_t &sample_idx, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) {
+ inline bool add_to_sample(const R &record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer,
+ R *sample_buffer, size_t &sample_idx, MutableBuffer<R> *buffer, size_t buffer_cutoff) {
TIMER_INIT();
TIMER_START();
sampling_attempts++;
@@ -445,7 +446,7 @@ private:
if (new_idx > 0) {
assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
}
- m_levels.emplace_back(new InternalLevel<K,V,W>(new_idx, new_shard_cnt, DELETE_TAGGING));
+ m_levels.emplace_back(new InternalLevel<R>(new_idx, new_shard_cnt, DELETE_TAGGING));
m_last_level_idx++;
return new_idx;
@@ -495,7 +496,7 @@ private:
* returns -1 if idx==0, and no such level exists, to simplify
* the logic of the first merge.
*/
- inline level_index find_mergable_level(level_index idx, MutableBuffer<K,V,W> *buffer=nullptr) {
+ inline level_index find_mergable_level(level_index idx, MutableBuffer<R> *buffer=nullptr) {
if (idx == 0 && m_levels.size() == 0) return -1;
@@ -525,7 +526,7 @@ private:
// merging two memory levels
if (LSM_LEVELING) {
auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<K,V,W>::merge_levels(m_levels[base_level], m_levels[incoming_level],
+ m_levels[base_level] = InternalLevel<R>::merge_levels(m_levels[base_level], m_levels[incoming_level],
DELETE_TAGGING, rng);
mark_as_unused(tmp);
} else {
@@ -533,18 +534,18 @@ private:
}
mark_as_unused(m_levels[incoming_level]);
- m_levels[incoming_level] = new InternalLevel<K,V,W>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING);
+ m_levels[incoming_level] = new InternalLevel<R>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING);
}
- inline void merge_buffer_into_l0(MutableBuffer<K,V,W> *buffer, gsl_rng *rng) {
+ inline void merge_buffer_into_l0(MutableBuffer<R> *buffer, gsl_rng *rng) {
assert(m_levels[0]);
if (LSM_LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0];
- auto temp_level = new InternalLevel<K,V,W>(0, 1, DELETE_TAGGING);
+ auto temp_level = new InternalLevel<R>(0, 1, DELETE_TAGGING);
temp_level->append_mem_table(buffer, rng);
- auto new_level = InternalLevel<K,V,W>::merge_levels(old_level, temp_level, DELETE_TAGGING, rng);
+ auto new_level = InternalLevel<R>::merge_levels(old_level, temp_level, DELETE_TAGGING, rng);
m_levels[0] = new_level;
delete temp_level;
@@ -560,7 +561,7 @@ private:
* level may not be able to immediately be deleted, depending upon who
* else is using it.
*/
- inline void mark_as_unused(InternalLevel<K,V,W> *level) {
+ inline void mark_as_unused(InternalLevel<R> *level) {
delete level;
}
@@ -608,7 +609,7 @@ private:
* a pointer to the memory table to use, if desired. Otherwise, there are
* no guarantees about which buffer will be accessed if level_index is -1.
*/
- inline size_t get_level_record_count(level_index idx, MutableBuffer<K,V,W> *buffer=nullptr) {
+ inline size_t get_level_record_count(level_index idx, MutableBuffer<R> *buffer=nullptr) {
assert(idx >= -1);
if (idx == -1) {
return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count();