summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-05-15 16:48:56 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-05-15 16:48:56 -0400
commitff000799c3254f52e0beabbe9c62d10c3fc4178e (patch)
tree49a1a045678315e8e215fd80409973679b793043
parent418e9b079e559c86f3a5b276f712ad2f5d66533c (diff)
downloaddynamic-extension-ff000799c3254f52e0beabbe9c62d10c3fc4178e.tar.gz
Record format generalization
Currently, tombstone counting is bugged. But the rest of it appears to be working.
-rw-r--r--CMakeLists.txt4
-rw-r--r--include/ds/PriorityQueue.h14
-rw-r--r--include/framework/DynamicExtension.h79
-rw-r--r--include/framework/InternalLevel.h39
-rw-r--r--include/framework/MutableBuffer.h109
-rw-r--r--include/shard/MemISAM.h72
-rw-r--r--include/shard/WIRS.h108
-rw-r--r--include/util/Cursor.h26
-rw-r--r--include/util/Record.h86
-rw-r--r--tests/dynamic_extension_tests.cpp79
-rw-r--r--tests/internal_level_tests.cpp18
-rw-r--r--tests/memisam_tests.cpp41
-rw-r--r--tests/mutable_buffer_tests.cpp107
-rw-r--r--tests/testing.h99
-rw-r--r--tests/wirs_tests.cpp44
15 files changed, 502 insertions, 423 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3f551a3..7be4085 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -48,12 +48,10 @@ if (tests)
add_executable(dynamic_extension_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/dynamic_extension_tests.cpp)
target_link_libraries(dynamic_extension_tests PUBLIC gsl check subunit pthread)
target_include_directories(dynamic_extension_tests PRIVATE include)
-
+
add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp)
target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread)
target_include_directories(memisam_tests PRIVATE include)
-
-
endif()
# Benchmark build instructions
diff --git a/include/ds/PriorityQueue.h b/include/ds/PriorityQueue.h
index 290d5c8..22582da 100644
--- a/include/ds/PriorityQueue.h
+++ b/include/ds/PriorityQueue.h
@@ -16,14 +16,14 @@
namespace de {
-template <typename K, typename V, typename W=void>
+template <typename R>
struct queue_record {
- const Record<K, V, W> *data;
+ const R *data;
size_t version;
};
-template <typename K, typename V, typename W=void>
+template <typename R>
class PriorityQueue {
public:
PriorityQueue(size_t size) : data(size), tail(0) {}
@@ -54,7 +54,7 @@ public:
}
}
- void push(const Record<K, V, W>* record, size_t version=0) {
+ void push(const R* record, size_t version=0) {
assert(tail != this->data.size());
size_t new_idx = this->tail++;
@@ -67,7 +67,7 @@ public:
}
- queue_record<K, V, W> peek(size_t depth=0) {
+ queue_record<R> peek(size_t depth=0) {
ssize_t idx = 0;
size_t cur_depth = 0;
@@ -81,7 +81,7 @@ public:
}
private:
- std::vector<queue_record<K, V, W>> data;
+ std::vector<queue_record<R>> data;
size_t tail;
/*
@@ -124,7 +124,7 @@ private:
}
inline bool heap_cmp(size_t a, size_t b) {
- if (!data[a].data->match(data[b].data)) {
+ if (data[a].data != data[b].data) {
return *(data[a].data) < *(data[b].data);
} else if (data[a].version != data[b].version)
return data[a].version < data[b].version;
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index c65290a..1d9ee76 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -68,11 +68,12 @@ enum class DeletePolicy {
typedef ssize_t level_index;
-template <typename K, typename V, typename W=void>
+template <RecordInterface R>
class DynamicExtension {
- typedef WIRS<K,V,W> Shard;
- typedef MutableBuffer<K,V,W> MBuffer;
- typedef Record<K, V, W> Rec;
+ typedef WIRS<R> Shard;
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+ typedef decltype(R::weight) W;
public:
DynamicExtension(size_t buffer_cap, size_t buffer_delete_cap, size_t scale_factor,
@@ -82,8 +83,8 @@ public:
m_max_delete_prop(max_delete_prop),
m_max_rejection_rate(max_rejection_prop),
m_last_level_idx(-1),
- m_buffer_1(new MutableBuffer<K,V,W>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)),
- m_buffer_2(new MutableBuffer<K,V,W>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)),
+ m_buffer_1(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)),
+ m_buffer_2(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)),
m_buffer_1_merging(false), m_buffer_2_merging(false) {}
~DynamicExtension() {
@@ -113,9 +114,9 @@ public:
return buffer->delete_record(key, val);
}
- int append(const K& key, const V& val, W weight, bool tombstone, gsl_rng *rng) {
+ int append(R &rec, gsl_rng *rng) {
// NOTE: single-threaded implementation only
- MutableBuffer<K,V,W> *buffer;
+ MutableBuffer<R> *buffer;
while (!(buffer = get_buffer()))
;
@@ -123,13 +124,13 @@ public:
merge_buffer(rng);
}
- return buffer->append(key, val, weight, tombstone);
+ return buffer->append(rec);
}
- void range_sample(Record<K,V,W> *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
+ void range_sample(R *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
auto buffer = get_buffer();
Alias *buffer_alias = nullptr;
- std::vector<Record<K,V,W> *> buffer_records;
+ std::vector<R *> buffer_records;
size_t buffer_cutoff = 0;
W buffer_weight;
@@ -185,7 +186,7 @@ public:
rejections = 0;
while (shard_samples[0] > 0) {
- const Record<K,V,W> *rec;
+ const R *rec;
if (LSM_REJ_SAMPLE) {
rec = buffer->get_sample(lower_key, upper_key, rng);
} else {
@@ -193,13 +194,13 @@ public:
}
if (DELETE_TAGGING) {
- if (rec && !rec->get_delete_status()) {
+ if (rec && !rec->is_deleted()) {
sample_set[sample_idx++] = *rec;
} else {
rejections++;
}
} else {
- if (rec && !buffer->check_tombstone(rec->key, rec->value)) {
+ if (rec && !buffer->check_tombstone(*rec)) {
sample_set[sample_idx++] = *rec;
} else {
rejections++;
@@ -220,14 +221,14 @@ public:
}
}
- std::vector<Rec> results;
+ std::vector<R> results;
for (size_t i=1; i<shard_samples.size(); i++) {
results.reserve(shard_samples[i]);
shards[i].second->get_samples(states[i], results, lower_key, upper_key, shard_samples[i], rng);
for (size_t j=0; j<results.size(); j++) {
- if (rejection(&results[j], shards[i].first, lower_key, upper_key, buffer, buffer_cutoff)) {
+ if (rejection(results[j], shards[i].first, lower_key, upper_key, buffer, buffer_cutoff)) {
rejections++;
continue;
}
@@ -252,16 +253,16 @@ public:
// should correspond to the shard containing the record in question
//
// Passing INVALID_SHID indicates that the record exists within the buffer
- bool is_deleted(const Record<K,V,W> *record, const ShardID &shid, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) {
+ bool is_deleted(const R &record, const ShardID &shid, MutableBuffer<R> *buffer, size_t buffer_cutoff) {
// If tagging is enabled, we just need to check if the record has the delete tag set
if (DELETE_TAGGING) {
- return record->get_delete_status();
+ return record.is_deleted();
}
// Otherwise, we need to look for a tombstone.
// check for tombstone in the buffer. This will require accounting for the cutoff eventually.
- if (buffer->check_tombstone(record->key, record->value)) {
+ if (buffer->check_tombstone(record)) {
return true;
}
@@ -271,13 +272,13 @@ public:
}
for (size_t lvl=0; lvl<=shid.level_idx; lvl++) {
- if (m_levels[lvl]->check_tombstone(0, record->key, record->value)) {
+ if (m_levels[lvl]->check_tombstone(0, record)) {
return true;
}
}
// check the level containing the shard
- return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record->key, record->value);
+ return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record);
}
@@ -379,8 +380,8 @@ public:
}
private:
- MutableBuffer<K,V,W> *m_buffer_1;
- MutableBuffer<K,V,W> *m_buffer_2;
+ MutableBuffer<R> *m_buffer_1;
+ MutableBuffer<R> *m_buffer_2;
std::atomic<bool> m_active_buffer;
std::atomic<bool> m_buffer_1_merging;
std::atomic<bool> m_buffer_2_merging;
@@ -389,11 +390,11 @@ private:
double m_max_delete_prop;
double m_max_rejection_rate;
- std::vector<InternalLevel<K,V,W> *> m_levels;
+ std::vector<InternalLevel<R> *> m_levels;
level_index m_last_level_idx;
- MutableBuffer<K,V,W> *get_buffer() {
+ MutableBuffer<R> *get_buffer() {
if (m_buffer_1_merging && m_buffer_2_merging) {
return nullptr;
}
@@ -401,11 +402,11 @@ private:
return (m_active_buffer) ? m_buffer_2 : m_buffer_1;
}
- inline bool rejection(const Record<K,V,W> *record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) {
- if (record->is_tombstone()) {
+ inline bool rejection(const R &record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer<R> *buffer, size_t buffer_cutoff) {
+ if (record.is_tombstone()) {
tombstone_rejections++;
return true;
- } else if (record->key < lower_bound || record->key > upper_bound) {
+ } else if (record.key < lower_bound || record.key > upper_bound) {
bounds_rejections++;
return true;
} else if (is_deleted(record, shid, buffer, buffer_cutoff)) {
@@ -416,8 +417,8 @@ private:
return false;
}
- inline bool add_to_sample(const Record<K,V,W> *record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer,
- Record<K,V,W> *sample_buffer, size_t &sample_idx, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) {
+ inline bool add_to_sample(const R &record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer,
+ R *sample_buffer, size_t &sample_idx, MutableBuffer<R> *buffer, size_t buffer_cutoff) {
TIMER_INIT();
TIMER_START();
sampling_attempts++;
@@ -445,7 +446,7 @@ private:
if (new_idx > 0) {
assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
}
- m_levels.emplace_back(new InternalLevel<K,V,W>(new_idx, new_shard_cnt, DELETE_TAGGING));
+ m_levels.emplace_back(new InternalLevel<R>(new_idx, new_shard_cnt, DELETE_TAGGING));
m_last_level_idx++;
return new_idx;
@@ -495,7 +496,7 @@ private:
* returns -1 if idx==0, and no such level exists, to simplify
* the logic of the first merge.
*/
- inline level_index find_mergable_level(level_index idx, MutableBuffer<K,V,W> *buffer=nullptr) {
+ inline level_index find_mergable_level(level_index idx, MutableBuffer<R> *buffer=nullptr) {
if (idx == 0 && m_levels.size() == 0) return -1;
@@ -525,7 +526,7 @@ private:
// merging two memory levels
if (LSM_LEVELING) {
auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<K,V,W>::merge_levels(m_levels[base_level], m_levels[incoming_level],
+ m_levels[base_level] = InternalLevel<R>::merge_levels(m_levels[base_level], m_levels[incoming_level],
DELETE_TAGGING, rng);
mark_as_unused(tmp);
} else {
@@ -533,18 +534,18 @@ private:
}
mark_as_unused(m_levels[incoming_level]);
- m_levels[incoming_level] = new InternalLevel<K,V,W>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING);
+ m_levels[incoming_level] = new InternalLevel<R>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING);
}
- inline void merge_buffer_into_l0(MutableBuffer<K,V,W> *buffer, gsl_rng *rng) {
+ inline void merge_buffer_into_l0(MutableBuffer<R> *buffer, gsl_rng *rng) {
assert(m_levels[0]);
if (LSM_LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0];
- auto temp_level = new InternalLevel<K,V,W>(0, 1, DELETE_TAGGING);
+ auto temp_level = new InternalLevel<R>(0, 1, DELETE_TAGGING);
temp_level->append_mem_table(buffer, rng);
- auto new_level = InternalLevel<K,V,W>::merge_levels(old_level, temp_level, DELETE_TAGGING, rng);
+ auto new_level = InternalLevel<R>::merge_levels(old_level, temp_level, DELETE_TAGGING, rng);
m_levels[0] = new_level;
delete temp_level;
@@ -560,7 +561,7 @@ private:
* level may not be able to immediately be deleted, depending upon who
* else is using it.
*/
- inline void mark_as_unused(InternalLevel<K,V,W> *level) {
+ inline void mark_as_unused(InternalLevel<R> *level) {
delete level;
}
@@ -608,7 +609,7 @@ private:
* a pointer to the memory table to use, if desired. Otherwise, there are
* no guarantees about which buffer will be accessed if level_index is -1.
*/
- inline size_t get_level_record_count(level_index idx, MutableBuffer<K,V,W> *buffer=nullptr) {
+ inline size_t get_level_record_count(level_index idx, MutableBuffer<R> *buffer=nullptr) {
assert(idx >= -1);
if (idx == -1) {
return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count();
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index 727a382..f0f19da 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -19,16 +19,19 @@
namespace de {
-template <typename K, typename V, typename W=void>
+template <RecordInterface R>
class InternalLevel {
static const size_t REJECTION_TRIGGER_THRESHOLD = 1024;
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+
private:
struct InternalLevelStructure {
InternalLevelStructure(size_t cap)
: m_cap(cap)
- , m_shards(new WIRS<K, V, W>*[cap]{nullptr})
+ , m_shards(new WIRS<R>*[cap]{nullptr})
, m_bfs(new BloomFilter*[cap]{nullptr}) {}
~InternalLevelStructure() {
@@ -42,7 +45,7 @@ private:
}
size_t m_cap;
- WIRS<K, V, W>** m_shards;
+ WIRS<R>** m_shards;
BloomFilter** m_bfs;
};
@@ -74,40 +77,40 @@ public:
new BloomFilter(BF_FPR,
new_level->get_tombstone_count() + base_level->get_tombstone_count(),
BF_HASH_FUNCS, rng);
- WIRS<K, V, W>* shards[2];
+ WIRS<R>* shards[2];
shards[0] = base_level->m_structure->m_shards[0];
shards[1] = new_level->m_structure->m_shards[0];
- res->m_structure->m_shards[0] = new WIRS<K, V, W>(shards, 2, res->m_structure->m_bfs[0], tagging);
+ res->m_structure->m_shards[0] = new WIRS<R>(shards, 2, res->m_structure->m_bfs[0], tagging);
return res;
}
- void append_mem_table(MutableBuffer<K,V,W>* buffer, const gsl_rng* rng) {
+ void append_mem_table(MutableBuffer<R>* buffer, const gsl_rng* rng) {
assert(m_shard_cnt < m_structure->m_cap);
m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, rng);
- m_structure->m_shards[m_shard_cnt] = new WIRS<K, V, W>(buffer, m_structure->m_bfs[m_shard_cnt], m_tagging);
+ m_structure->m_shards[m_shard_cnt] = new WIRS<R>(buffer, m_structure->m_bfs[m_shard_cnt], m_tagging);
++m_shard_cnt;
}
void append_merged_shards(InternalLevel* level, const gsl_rng* rng) {
assert(m_shard_cnt < m_structure->m_cap);
m_structure->m_bfs[m_shard_cnt] = new BloomFilter(BF_FPR, level->get_tombstone_count(), BF_HASH_FUNCS, rng);
- m_structure->m_shards[m_shard_cnt] = new WIRS<K, V, W>(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt], m_tagging);
+ m_structure->m_shards[m_shard_cnt] = new WIRS<R>(level->m_structure->m_shards, level->m_shard_cnt, m_structure->m_bfs[m_shard_cnt], m_tagging);
++m_shard_cnt;
}
- WIRS<K, V, W> *get_merged_shard() {
- WIRS<K, V, W> *shards[m_shard_cnt];
+ WIRS<R> *get_merged_shard() {
+ WIRS<R> *shards[m_shard_cnt];
for (size_t i=0; i<m_shard_cnt; i++) {
shards[i] = (m_structure->m_shards[i]) ? m_structure->m_shards[i] : nullptr;
}
- return new WIRS<K, V, W>(shards, m_shard_cnt, nullptr, m_tagging);
+ return new WIRS<R>(shards, m_shard_cnt, nullptr, m_tagging);
}
// Append the sample range in-order.....
- void get_shard_weights(std::vector<W>& weights, std::vector<std::pair<ShardID, WIRS<K, V, W> *>> &shards, std::vector<void*>& shard_states, const K& low, const K& high) {
+ void get_shard_weights(std::vector<uint64_t>& weights, std::vector<std::pair<ShardID, WIRS<R> *>> &shards, std::vector<void*>& shard_states, const K& low, const K& high) {
for (size_t i=0; i<m_shard_cnt; i++) {
if (m_structure->m_shards[i]) {
auto shard_state = m_structure->m_shards[i]->get_sample_shard_state(low, high);
@@ -116,7 +119,7 @@ public:
weights.push_back(shard_state->tot_weight);
shard_states.emplace_back(shard_state);
} else {
- WIRS<K, V, W>::delete_state(shard_state);
+ WIRS<R>::delete_state(shard_state);
}
}
}
@@ -130,12 +133,12 @@ public:
return false;
}
- bool check_tombstone(size_t shard_stop, const K& key, const V& val) {
+ bool check_tombstone(size_t shard_stop, const R& rec) {
if (m_shard_cnt == 0) return false;
for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) {
- if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(key))
- && m_structure->m_shards[i]->check_tombstone(key, val))
+ if (m_structure->m_shards[i] && (m_structure->m_bfs[i]->lookup(rec.key))
+ && m_structure->m_shards[i]->check_tombstone(rec))
return true;
}
return false;
@@ -151,11 +154,11 @@ public:
return false;
}
- const Record<K, V, W>* get_record_at(size_t shard_no, size_t idx) {
+ const R* get_record_at(size_t shard_no, size_t idx) {
return m_structure->m_shards[shard_no]->get_record_at(idx);
}
- WIRS<K, V, W>* get_shard(size_t idx) {
+ WIRS<R>* get_shard(size_t idx) {
return m_structure->m_shards[idx];
}
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h
index 42bc9a7..74838b8 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/MutableBuffer.h
@@ -26,15 +26,15 @@
namespace de {
-template <typename K, typename V, typename W=void>
+template <RecordInterface R>
class MutableBuffer {
public:
MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap, const gsl_rng* rng)
: m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
, m_tombstonecnt(0), m_weight(0), m_max_weight(0) {
- auto len = capacity * sizeof(Record<K, V, W>);
+ auto len = capacity * sizeof(R);
size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE));
- m_data = (Record<K, V, W>*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
+ m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
m_tombstone_filter = nullptr;
if (max_tombstone_cap > 0) {
assert(rng != nullptr);
@@ -47,68 +47,35 @@ public:
if (m_tombstone_filter) delete m_tombstone_filter;
}
- template <typename W_=W,
- typename =std::enable_if_t<std::is_same<W_, void>::value>>
- int append(const K& key, const V& value, bool is_tombstone = false) {
- static_assert(std::is_same<W_, void>::value);
- if (is_tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0;
+ template <typename R_ = R>
+ int append(const R &rec) {
+ if (rec.is_tombstone() && m_tombstonecnt + 1 > m_tombstone_cap) return 0;
int32_t pos = 0;
if ((pos = try_advance_tail()) == -1) return 0;
- m_data[pos].key = key;
- m_data[pos].value = value;
- m_data[pos].header = ((pos << 2) | (is_tombstone ? 1 : 0));
+ m_data[pos] = rec;
+ m_data[pos].header |= (pos << 2);
- if (is_tombstone) {
+ if (rec.is_tombstone()) {
m_tombstonecnt.fetch_add(1);
- if (m_tombstone_filter) m_tombstone_filter->insert(key);
+ if (m_tombstone_filter) m_tombstone_filter->insert(rec.key);
}
- m_weight.fetch_add(1);
- return 1;
- }
-
- template <typename W_=W,
- typename = std::enable_if_t<!std::is_same<W_, void>::value>>
- int append(const K& key, const V& value, W_ weight=1, bool is_tombstone = false) {
- static_assert(!std::is_same<W_, void>::value);
- if (is_tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0;
-
- int32_t pos = 0;
- if ((pos = try_advance_tail()) == -1) return 0;
-
- if (is_tombstone) {
- weight = 0;
- }
-
- m_data[pos].key = key;
- m_data[pos].value = value;
- m_data[pos].header = ((pos << 2) | (is_tombstone ? 1 : 0));
- m_data[pos].weight = weight;
-
- if (is_tombstone) {
- m_tombstonecnt.fetch_add(1);
- if (m_tombstone_filter) m_tombstone_filter->insert(key);
- }
-
- double old_val, new_val;
- do {
- old_val = m_weight.load();
- new_val = old_val + weight;
- } while (!m_weight.compare_exchange_strong(old_val, new_val));
-
-
- double old = m_max_weight.load();
- while (old < weight) {
- m_max_weight.compare_exchange_strong(old, weight);
- old = m_max_weight.load();
+ if constexpr (WeightedRecordInterface<R_>) {
+ m_weight.fetch_add(rec.weight);
+ double old = m_max_weight.load();
+ while (old < rec.weight) {
+ m_max_weight.compare_exchange_strong(old, rec.weight);
+ old = m_max_weight.load();
+ }
+ } else {
+ m_weight.fetch_add(1);
}
return 1;
}
-
bool truncate() {
m_tombstonecnt.store(0);
m_reccnt.store(0);
@@ -119,10 +86,10 @@ public:
return true;
}
- Record<K, V, W>* sorted_output() {
+ R* sorted_output() {
TIMER_INIT();
TIMER_START();
- std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp<K,V,W>);
+ std::sort(m_data, m_data + m_reccnt.load(), memtable_record_cmp<R>);
TIMER_STOP();
#ifdef INSTRUMENT_MERGING
@@ -147,11 +114,11 @@ public:
return m_tombstonecnt.load();
}
- bool delete_record(const K& key, const V& val) {
+ bool delete_record(const R& rec) {
auto offset = 0;
while (offset < m_reccnt.load()) {
- if (m_data[offset].match(key, val, false)) {
- m_data[offset].set_delete_status();
+ if (m_data[offset] == rec) {
+ m_data[offset].set_delete();
return true;
}
offset++;
@@ -160,23 +127,25 @@ public:
return false;
}
- bool check_tombstone(const K& key, const V& value) {
- if (m_tombstone_filter && !m_tombstone_filter->lookup(key)) return false;
+ bool check_tombstone(const R& rec) {
+ if (m_tombstone_filter && !m_tombstone_filter->lookup(rec.key)) return false;
auto offset = 0;
while (offset < m_reccnt.load()) {
- if (m_data[offset].match(key, value, true)) return true;
+ if (m_data[offset] == rec && m_data[offset].is_tombstone()) {
+ return true;
+ }
offset++;;
}
return false;
}
- const Record<K, V, W>* get_record_at(size_t idx) {
+ const R* get_record_at(size_t idx) {
return m_data + idx;
}
size_t get_memory_utilization() {
- return m_cap * sizeof(Record<K, V, W>);
+ return m_cap * sizeof(R);
}
size_t get_aux_memory_utilization() {
@@ -185,17 +154,18 @@ public:
//
// NOTE: This operation samples from records strictly between the upper and
// lower bounds, not including them
- double get_sample_range(const K& lower, const K& upper, std::vector<Record<K, V, W> *> &records,
- Alias **alias, size_t *cutoff) {
+ template <typename R_ = R>
+ double get_sample_range(const decltype(R_::key) lower, const decltype(R_::key)& upper,
+ std::vector<R *> &records, Alias **alias, size_t *cutoff) {
std::vector<double> weights;
*cutoff = std::atomic_load(&m_reccnt) - 1;
records.clear();
double tot_weight = 0.0;
for (size_t i = 0; i < (*cutoff) + 1; i++) {
- Record<K, V, W> *rec = m_data + i;
+ R *rec = m_data + i;
- if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->get_delete_status()) {
+ if (rec->key >= lower && rec->key <= upper && !rec->is_tombstone() && !rec->is_deleted()) {
weights.push_back(rec->weight);
records.push_back(rec);
tot_weight += rec->weight;
@@ -212,7 +182,8 @@ public:
}
// rejection sampling
- const Record<K, V, W> *get_sample(const K& lower, const K& upper, gsl_rng *rng) {
+ template <typename R_ = R>
+ const R *get_sample(const decltype(R_::key)& lower, const decltype(R_::key)& upper, gsl_rng *rng) {
size_t reccnt = m_reccnt.load();
if (reccnt == 0) {
return nullptr;
@@ -230,7 +201,7 @@ public:
if (test <= rec->weight &&
rec->key >= lower &&
rec->key <= upper &&
- !rec->is_tombstone() && !rec->get_delete_status()) {
+ !rec->is_tombstone() && !rec->is_deleted()) {
return rec;
}
@@ -259,7 +230,7 @@ private:
size_t m_tombstone_cap;
//char* m_data;
- Record<K, V, W>* m_data;
+ R* m_data;
BloomFilter* m_tombstone_filter;
alignas(64) std::atomic<size_t> m_tombstonecnt;
diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h
index 6d97f95..dd2fd85 100644
--- a/include/shard/MemISAM.h
+++ b/include/shard/MemISAM.h
@@ -23,10 +23,11 @@ namespace de {
thread_local size_t mrun_cancelations = 0;
-template <typename K, typename V>
+template <RecordInterface R>
class MemISAM {
private:
-typedef Record<K, V> Rec;
+typedef decltype(R::key) K;
+typedef decltype(R::value) V;
constexpr static size_t inmem_isam_node_size = 256;
constexpr static size_t inmem_isam_fanout = inmem_isam_node_size / (sizeof(K) + sizeof(char*));
@@ -36,24 +37,23 @@ struct InMemISAMNode {
char* child[inmem_isam_fanout];
};
-constexpr static size_t inmem_isam_leaf_fanout = inmem_isam_node_size / sizeof(Rec);
+constexpr static size_t inmem_isam_leaf_fanout = inmem_isam_node_size / sizeof(R);
constexpr static size_t inmem_isam_node_keyskip = sizeof(K) * inmem_isam_fanout;
static_assert(sizeof(InMemISAMNode) == inmem_isam_node_size, "node size does not match");
-
public:
MemISAM(std::string data_fname, size_t record_cnt, size_t tombstone_cnt, BloomFilter *bf, bool tagging)
: m_reccnt(record_cnt), m_tombstone_cnt(tombstone_cnt), m_deleted_cnt(0), m_tagging(tagging) {
// read the stored data file the file
- size_t alloc_size = (record_cnt * sizeof(Rec)) + (CACHELINE_SIZE - (record_cnt * sizeof(Rec)) % CACHELINE_SIZE);
+ size_t alloc_size = (record_cnt * sizeof(R)) + (CACHELINE_SIZE - (record_cnt * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, alloc_size);
FILE *file = fopen(data_fname.c_str(), "rb");
assert(file);
- auto res = fread(m_data, sizeof(Rec), m_reccnt, file);
+ auto res = fread(m_data, sizeof(R), m_reccnt, file);
assert (res == m_reccnt);
fclose(file);
@@ -71,34 +71,34 @@ public:
}
}
- MemISAM(MutableBuffer<K,V>* buffer, BloomFilter* bf, bool tagging)
+ MemISAM(MutableBuffer<R>* buffer, BloomFilter* bf, bool tagging)
:m_reccnt(0), m_tombstone_cnt(0), m_isam_nodes(nullptr), m_deleted_cnt(0), m_tagging(tagging) {
- size_t alloc_size = (buffer->get_record_count() * sizeof(Rec)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Rec)) % CACHELINE_SIZE);
+ size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
TIMER_INIT();
size_t offset = 0;
m_reccnt = 0;
TIMER_START();
- Rec* base = buffer->sorted_output();
+ R* base = buffer->sorted_output();
TIMER_STOP();
auto sort_time = TIMER_RESULT();
- Rec* stop = base + buffer->get_record_count();
+ R* stop = base + buffer->get_record_count();
TIMER_START();
while (base < stop) {
if (!m_tagging) {
if (!base->is_tombstone() && (base + 1 < stop)
- && base->match(base + 1) && (base + 1)->is_tombstone()) {
+ && *base == *(base + 1) && (base + 1)->is_tombstone()) {
base += 2;
mrun_cancelations++;
continue;
}
- } else if (base->get_delete_status()) {
+ } else if (base->is_deleted()) {
base += 1;
continue;
}
@@ -123,15 +123,15 @@ public:
TIMER_STOP();
auto level_time = TIMER_RESULT();
- fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time);
+ //fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time);
}
MemISAM(MemISAM** runs, size_t len, BloomFilter* bf, bool tagging)
:m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_isam_nodes(nullptr), m_tagging(tagging) {
- std::vector<Cursor<K,V>> cursors;
+ std::vector<Cursor<R>> cursors;
cursors.reserve(len);
- PriorityQueue<K,V> pq(len);
+ PriorityQueue<R> pq(len);
size_t attemp_reccnt = 0;
@@ -142,21 +142,21 @@ public:
attemp_reccnt += runs[i]->get_record_count();
pq.push(cursors[i].ptr, i);
} else {
- cursors.emplace_back(Cursor<K,V>{nullptr, nullptr, 0, 0});
+ cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0});
}
}
- size_t alloc_size = (attemp_reccnt * sizeof(Rec)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Rec)) % CACHELINE_SIZE);
+ size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
size_t offset = 0;
while (pq.size()) {
auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<K,V>{nullptr, 0};
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<R>{nullptr, 0};
if (!m_tagging && !now.data->is_tombstone() && next.data != nullptr &&
- now.data->match(next.data) && next.data->is_tombstone()) {
+ *now.data == *next.data && next.data->is_tombstone()) {
pq.pop(); pq.pop();
auto& cursor1 = cursors[now.version];
@@ -165,7 +165,7 @@ public:
if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
} else {
auto& cursor = cursors[now.version];
- if (!m_tagging || !cursor.ptr->get_delete_status()) {
+ if (!m_tagging || !cursor.ptr->is_deleted()) {
m_data[m_reccnt++] = *cursor.ptr;
if (cursor.ptr->is_tombstone()) {
++m_tombstone_cnt;
@@ -188,7 +188,7 @@ public:
if (m_isam_nodes) free(m_isam_nodes);
}
- Rec* sorted_output() const {
+ R* sorted_output() const {
return m_data;
}
@@ -208,7 +208,7 @@ public:
while (idx < m_reccnt && m_data[idx].lt(key, val)) ++idx;
- if (m_data[idx].match(key, val, false)) {
+ if (m_data[idx] == R {key, val}) {
m_data[idx].set_delete_status();
m_deleted_cnt++;
return true;
@@ -217,7 +217,7 @@ public:
return false;
}
- const Rec* get_record_at(size_t idx) const {
+ const R* get_record_at(size_t idx) const {
return (idx < m_reccnt) ? m_data + idx : nullptr;
}
@@ -235,7 +235,7 @@ public:
now = next ? next : reinterpret_cast<const InMemISAMNode*>(now->child[inmem_isam_fanout - 1]);
}
- const Rec* pos = reinterpret_cast<const Rec*>(now);
+ const R* pos = reinterpret_cast<const R*>(now);
while (pos < m_data + m_reccnt && pos->key < key) pos++;
return pos - m_data;
@@ -255,7 +255,7 @@ public:
now = next ? next : reinterpret_cast<const InMemISAMNode*>(now->child[inmem_isam_fanout - 1]);
}
- const Rec* pos = reinterpret_cast<const Rec*>(now);
+ const R* pos = reinterpret_cast<const R*>(now);
while (pos < m_data + m_reccnt && pos->key <= key) pos++;
return pos - m_data;
@@ -267,20 +267,20 @@ public:
return false;
}
- Rec* ptr = m_data + idx;
+ R* ptr = m_data + idx;
while (ptr < m_data + m_reccnt && ptr->lt(key, val)) ptr++;
- return ptr->match(key, val, true);
+ return *ptr == R {key, val} && ptr->is_tombstone();
}
size_t get_memory_utilization() {
- return m_reccnt * sizeof(Rec) + m_internal_node_cnt * inmem_isam_node_size;
+ return m_reccnt * sizeof(R) + m_internal_node_cnt * inmem_isam_node_size;
}
void persist_to_file(std::string data_fname) {
FILE *file = fopen(data_fname.c_str(), "wb");
assert(file);
- fwrite(m_data, sizeof(Rec), m_reccnt, file);
+ fwrite(m_data, sizeof(R), m_reccnt, file);
fclose(file);
}
@@ -303,14 +303,14 @@ private:
InMemISAMNode* current_node = m_isam_nodes;
- const Rec* leaf_base = m_data;
- const Rec* leaf_stop = m_data + m_reccnt;
+ const R* leaf_base = m_data;
+ const R* leaf_stop = m_data + m_reccnt;
while (leaf_base < leaf_stop) {
size_t fanout = 0;
for (size_t i = 0; i < inmem_isam_fanout; ++i) {
auto rec_ptr = leaf_base + inmem_isam_leaf_fanout * i;
if (rec_ptr >= leaf_stop) break;
- const Rec* sep_key = std::min(rec_ptr + inmem_isam_leaf_fanout - 1, leaf_stop - 1);
+ const R* sep_key = std::min(rec_ptr + inmem_isam_leaf_fanout - 1, leaf_stop - 1);
current_node->keys[i] = sep_key->key;
current_node->child[i] = (char*)rec_ptr;
++fanout;
@@ -350,7 +350,7 @@ private:
}
// Members: sorted data, internal ISAM levels, reccnt;
- Rec* m_data;
+ R* m_data;
InMemISAMNode* m_isam_nodes;
InMemISAMNode* m_root;
size_t m_reccnt;
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h
index 39337bf..41766b9 100644
--- a/include/shard/WIRS.h
+++ b/include/shard/WIRS.h
@@ -12,6 +12,7 @@
#include <cassert>
#include <queue>
#include <memory>
+#include <concepts>
#include "ds/PriorityQueue.h"
#include "util/Cursor.h"
@@ -24,19 +25,26 @@ namespace de {
thread_local size_t wirs_cancelations = 0;
-template <typename K, typename V, typename W>
+template <WeightedRecordInterface R>
class WIRS {
private:
+
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+ typedef decltype(R::weight) W;
+
+ template <WeightedRecordInterface R_ = R>
struct wirs_node {
- struct wirs_node *left, *right;
+ struct wirs_node<R_> *left, *right;
K low, high;
W weight;
Alias* alias;
};
+ template <WeightedRecordInterface R_ = R>
struct WIRSState {
W tot_weight;
- std::vector<wirs_node*> nodes;
+ std::vector<wirs_node<R_>*> nodes;
Alias* top_level_alias;
~WIRSState() {
@@ -45,13 +53,13 @@ private:
};
public:
- WIRS(MutableBuffer<K, V, W>* buffer, BloomFilter* bf, bool tagging)
+ WIRS(MutableBuffer<R>* buffer, BloomFilter* bf, bool tagging)
: m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0),
m_ts_check_cnt(0), m_tagging(tagging), m_root(nullptr) {
- size_t alloc_size = (buffer->get_record_count() * sizeof(Record<K, V, W>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Record<K, V, W>)) % CACHELINE_SIZE);
+ size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (Record<K, V, W>*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
size_t offset = 0;
m_reccnt = 0;
@@ -61,13 +69,13 @@ public:
while (base < stop) {
if (!m_tagging) {
if (!(base->is_tombstone()) && (base + 1) < stop) {
- if (base->match(base + 1) && (base + 1)->is_tombstone()) {
+ if (*base == *(base + 1) && (base + 1)->is_tombstone()) {
base += 2;
wirs_cancelations++;
continue;
}
}
- } else if (base->get_delete_status()) {
+ } else if (base->is_deleted()) {
base += 1;
continue;
}
@@ -92,10 +100,10 @@ public:
WIRS(WIRS** shards, size_t len, BloomFilter* bf, bool tagging)
: m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0),
m_tagging(tagging), m_root(nullptr) {
- std::vector<Cursor<K,V,W>> cursors;
+ std::vector<Cursor<R>> cursors;
cursors.reserve(len);
- PriorityQueue<K, V, W> pq(len);
+ PriorityQueue<R> pq(len);
size_t attemp_reccnt = 0;
@@ -106,28 +114,28 @@ public:
attemp_reccnt += shards[i]->get_record_count();
pq.push(cursors[i].ptr, i);
} else {
- cursors.emplace_back(Cursor<K,V,W>{nullptr, nullptr, 0, 0});
+ cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0});
}
}
- size_t alloc_size = (attemp_reccnt * sizeof(Record<K, V, W>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Record<K, V, W>)) % CACHELINE_SIZE);
+ size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (Record<K, V, W>*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
while (pq.size()) {
auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<K, V, W>{nullptr, 0};
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<R>{nullptr, 0};
if (!m_tagging && !now.data->is_tombstone() && next.data != nullptr &&
- now.data->match(next.data) && next.data->is_tombstone()) {
+ *now.data == *next.data && next.data->is_tombstone()) {
pq.pop(); pq.pop();
auto& cursor1 = cursors[now.version];
auto& cursor2 = cursors[next.version];
- if (advance_cursor<K,V,W>(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor<K,V,W>(cursor2)) pq.push(cursor2.ptr, next.version);
+ if (advance_cursor<R>(cursor1)) pq.push(cursor1.ptr, now.version);
+ if (advance_cursor<R>(cursor2)) pq.push(cursor2.ptr, next.version);
} else {
auto& cursor = cursors[now.version];
- if (!m_tagging || !cursor.ptr->get_delete_status()) {
+ if (!m_tagging || !cursor.ptr->is_deleted()) {
m_data[m_reccnt++] = *cursor.ptr;
m_total_weight += cursor.ptr->weight;
if (bf && cursor.ptr->is_tombstone()) {
@@ -137,7 +145,7 @@ public:
}
pq.pop();
- if (advance_cursor<K,V,W>(cursor)) pq.push(cursor.ptr, now.version);
+ if (advance_cursor<R>(cursor)) pq.push(cursor.ptr, now.version);
}
}
@@ -155,16 +163,16 @@ public:
free_tree(m_root);
}
- bool delete_record(const K& key, const V& val) {
- size_t idx = get_lower_bound(key);
+ bool delete_record(const R& rec) {
+ size_t idx = get_lower_bound(rec.key);
if (idx >= m_reccnt) {
return false;
}
- while (idx < m_reccnt && m_data[idx].lt(key, val)) ++idx;
+ while (idx < m_reccnt && m_data[idx] < rec) ++idx;
- if (m_data[idx].match(key, val, false)) {
- m_data[idx].set_delete_status();
+ if (m_data[idx] == R {rec.key, rec.val} && !m_data[idx].is_tombstone()) {
+ m_data[idx].set_delete();
m_deleted_cnt++;
return true;
}
@@ -172,7 +180,7 @@ public:
return false;
}
- void free_tree(struct wirs_node* node) {
+ void free_tree(struct wirs_node<R>* node) {
if (node) {
delete node->alias;
free_tree(node->left);
@@ -181,7 +189,7 @@ public:
}
}
- Record<K, V, W>* sorted_output() const {
+ R* sorted_output() const {
return m_data;
}
@@ -193,19 +201,19 @@ public:
return m_tombstone_cnt;
}
- const Record<K, V, W>* get_record_at(size_t idx) const {
+ const R* get_record_at(size_t idx) const {
if (idx >= m_reccnt) return nullptr;
return m_data + idx;
}
// low - high -> decompose to a set of nodes.
// Build Alias across the decomposed nodes.
- WIRSState* get_sample_shard_state(const K& lower_key, const K& upper_key) {
- WIRSState* res = new WIRSState();
+ WIRSState<R>* get_sample_shard_state(const K& lower_key, const K& upper_key) {
+ auto res = new WIRSState();
// Simulate a stack to unfold recursion.
double tot_weight = 0.0;
- struct wirs_node* st[64] = {0};
+ struct wirs_node<R>* st[64] = {0};
st[0] = m_root;
size_t top = 1;
while(top > 0) {
@@ -231,15 +239,15 @@ public:
}
static void delete_state(void *state) {
- auto s = (WIRSState *) state;
+ WIRSState<R> *s = (WIRSState<R> *) state;
delete s;
}
// returns the number of records sampled
// NOTE: This operation returns records strictly between the lower and upper bounds, not
// including them.
- size_t get_samples(void* shard_state, std::vector<Record<K, V, W>> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
- WIRSState *state = (WIRSState *) shard_state;
+ size_t get_samples(void* shard_state, std::vector<R> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
+ WIRSState<R> *state = (WIRSState<R> *) shard_state;
if (sample_sz == 0) {
return 0;
}
@@ -295,30 +303,30 @@ public:
auto ptr = m_data + get_lower_bound(key);
- while (ptr < m_data + m_reccnt && ptr->lt(key, val)) {
+ while (ptr < m_data + m_reccnt && *ptr < R {key, val}) {
ptr ++;
}
- bool result = (m_tagging) ? ptr->get_delete_status()
- : ptr->match(key, val, true);
+ bool result = (m_tagging) ? ptr->is_deleted()
+ : *ptr == R {key, val} && ptr->is_tombstone();
m_rejection_cnt += result;
return result;
}
- bool check_tombstone(const K& key, const V& val) {
+ bool check_tombstone(const R& rec) {
m_ts_check_cnt++;
- size_t idx = get_lower_bound(key);
+ size_t idx = get_lower_bound(rec.key);
if (idx >= m_reccnt) {
return false;
}
- auto ptr = m_data + get_lower_bound(key);
+ auto ptr = m_data + get_lower_bound(rec.key);
- while (ptr < m_data + m_reccnt && ptr->lt(key, val)) {
+ while (ptr < m_data + m_reccnt && *ptr < rec) {
ptr ++;
}
- bool result = ptr->match(key, val, true);
+ bool result = *ptr == rec && ptr->is_tombstone();
m_rejection_cnt += result;
return result;
@@ -340,21 +348,21 @@ public:
private:
- bool covered_by(struct wirs_node* node, const K& lower_key, const K& upper_key) {
+ bool covered_by(struct wirs_node<R>* node, const K& lower_key, const K& upper_key) {
auto low_index = node->low * m_group_size;
auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1);
return lower_key < m_data[low_index].key && m_data[high_index].key < upper_key;
}
- bool intersects(struct wirs_node* node, const K& lower_key, const K& upper_key) {
+ bool intersects(struct wirs_node<R>* node, const K& lower_key, const K& upper_key) {
auto low_index = node->low * m_group_size;
auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1);
return lower_key < m_data[high_index].key && m_data[low_index].key < upper_key;
}
- struct wirs_node* construct_wirs_node(const std::vector<W>& weights, size_t low, size_t high) {
+ struct wirs_node<R>* construct_wirs_node(const std::vector<W>& weights, size_t low, size_t high) {
if (low == high) {
- return new wirs_node{nullptr, nullptr, low, high, weights[low], new Alias({1.0})};
+ return new wirs_node<R>{nullptr, nullptr, low, high, weights[low], new Alias({1.0})};
} else if (low > high) return nullptr;
std::vector<double> node_weights;
@@ -370,9 +378,9 @@ private:
size_t mid = (low + high) / 2;
- return new wirs_node{construct_wirs_node(weights, low, mid),
- construct_wirs_node(weights, mid + 1, high),
- low, high, sum, new Alias(node_weights)};
+ return new wirs_node<R>{construct_wirs_node(weights, low, mid),
+ construct_wirs_node(weights, mid + 1, high),
+ low, high, sum, new Alias(node_weights)};
}
@@ -410,9 +418,9 @@ private:
m_root = construct_wirs_node(weights, 0, n_groups-1);
}
- Record<K, V, W>* m_data;
+ R* m_data;
std::vector<Alias *> m_alias;
- wirs_node* m_root;
+ wirs_node<R>* m_root;
bool m_tagging;
W m_total_weight;
size_t m_reccnt;
diff --git a/include/util/Cursor.h b/include/util/Cursor.h
index 2339800..2609ae5 100644
--- a/include/util/Cursor.h
+++ b/include/util/Cursor.h
@@ -14,10 +14,10 @@
#include "io/PagedFile.h"
namespace de {
-template<typename K, typename V, typename W=void>
+template<typename R>
struct Cursor {
- Record<K,V,W> *ptr;
- const Record<K,V,W> *end;
+ R *ptr;
+ R *end;
size_t cur_rec_idx;
size_t rec_cnt;
@@ -36,8 +36,8 @@ struct Cursor {
* be updated to be equal to end, and false will be returned. Iterators will
* not be closed.
*/
-template<typename K, typename V, typename W>
-inline static bool advance_cursor(Cursor<K,V,W> &cur, PagedFileIterator *iter = nullptr) {
+template<typename R>
+inline static bool advance_cursor(Cursor<R> &cur, PagedFileIterator *iter = nullptr) {
cur.ptr++;
cur.cur_rec_idx++;
@@ -45,8 +45,8 @@ inline static bool advance_cursor(Cursor<K,V,W> &cur, PagedFileIterator *iter =
if (cur.ptr >= cur.end) {
if (iter && iter->next()) {
- cur.ptr = (Record<K,V,W>*)iter->get_item();
- cur.end = cur.ptr + (PAGE_SIZE / sizeof(Record<K,V,W>));
+ cur.ptr = (R*)iter->get_item();
+ cur.end = cur.ptr + (PAGE_SIZE / sizeof(R));
return true;
}
@@ -62,14 +62,14 @@ inline static bool advance_cursor(Cursor<K,V,W> &cur, PagedFileIterator *iter =
* This allows for "peaking" at the next largest element after the current
* largest is processed.
*/
-template <typename K, typename V, typename W>
-inline static Cursor<K,V,W> *get_next(std::vector<Cursor<K,V,W>> &cursors, Cursor<K,V,W> *current=nullptr) {
- const Record<K,V,W> *min_rec = nullptr;
- Cursor<K,V,W> *result = nullptr;
+template <typename R>
+inline static Cursor<R> *get_next(std::vector<Cursor<R>> &cursors, Cursor<R> *current=nullptr) {
+ const R *min_rec = nullptr;
+ Cursor<R> *result = nullptr;
for (size_t i=0; i< cursors.size(); i++) {
- if (cursors[i] == (Cursor<K,V,W>) {0} ) continue;
+ if (cursors[i] == (Cursor<R>) {0} ) continue;
- const Record<K,V,W> *rec = (&cursors[i] == current) ? cursors[i].ptr + 1 : cursors[i].ptr;
+ const R *rec = (&cursors[i] == current) ? cursors[i].ptr + 1 : cursors[i].ptr;
if (rec >= cursors[i].end) continue;
if (min_rec == nullptr) {
diff --git a/include/util/Record.h b/include/util/Record.h
index 687e745..ce101f4 100644
--- a/include/util/Record.h
+++ b/include/util/Record.h
@@ -10,50 +10,110 @@
#pragma once
#include <cstring>
+#include <concepts>
#include "util/base.h"
namespace de {
-template <typename K, typename V, typename W=void>
+template<typename R>
+concept RecordInterface = requires(R r, R s) {
+ r.key;
+ r.value;
+ r.header;
+
+ {r.is_tombstone()} -> std::convertible_to<bool>;
+ {r.is_deleted()} -> std::convertible_to<bool>;
+ r.set_delete();
+ r.set_tombstone(std::declval<bool>);
+ { r < s } ->std::convertible_to<bool>;
+ { r == s } ->std::convertible_to<bool>;
+ { r.header < s.header } -> std::convertible_to<bool>;
+};
+
+template <typename R>
+concept WeightedRecordInterface = RecordInterface<R> && requires(R r) {
+ {r.weight} -> std::convertible_to<double>;
+};
+
+template <typename K, typename V>
struct Record {
K key;
V value;
- typename std::conditional<!std::is_same<W, void>::value, W, std::false_type>::type weight;
- uint32_t header;
+ uint32_t header = 0;
+
+ inline void set_delete() {
+ header |= 2;
+ }
+
+ inline bool is_deleted() const {
+ return header & 2;
+ }
+
+ inline void set_tombstone(bool val=true) {
+ if (val) {
+ header |= val;
+ } else {
+ header &= 0;
+ }
+ }
+
+ inline bool is_tombstone() const {
+ return header & 1;
+ }
+
+ inline bool operator<(const Record& other) const {
+ return key < other.key || (key == other.key && value < other.value);
+ }
- inline bool match(K k, V v, bool is_tombstone) const {
- return (key == k) && (value == v) && ((header & 1) == is_tombstone);
+ inline bool operator==(const Record& other) const {
+ return key == other.key && value == other.value;
}
+};
+
+template <typename K, typename V, typename W>
+struct WeightedRecord {
+ K key;
+ V value;
+ W weight = 1;
+ uint32_t header = 0;
- inline void set_delete_status() {
+ inline void set_delete() {
header |= 2;
}
- inline bool get_delete_status() const {
+ inline bool is_deleted() const {
return header & 2;
}
+ inline void set_tombstone(bool val=true) {
+ if (val) {
+ header |= val;
+ } else {
+ header &= 0;
+ }
+ }
+
inline bool is_tombstone() const {
return header & 1;
}
- inline int match(const Record* other) const {
+ inline int match(const WeightedRecord* other) const {
return key == other->key && value == other->value;
}
- inline bool operator<(const Record& other) const {
+ inline bool operator<(const WeightedRecord& other) const {
return key < other.key || (key == other.key && value < other.value);
}
- inline bool lt(const K& k, const V& v) const {
- return key < k || (key == k && value < v);
+ inline bool operator==(const WeightedRecord& other) const {
+ return key == other.key && value == other.value;
}
};
-template <typename K, typename V, typename W=void>
-static bool memtable_record_cmp(const Record<K, V, W>& a, const Record<K, V, W>& b) {
+template <RecordInterface R>
+static bool memtable_record_cmp(const R& a, const R& b) {
return (a.key < b.key) || (a.key == b.key && a.value < b.value)
|| (a.key == b.key && a.value == b.value && a.header < b.header);
}
diff --git a/tests/dynamic_extension_tests.cpp b/tests/dynamic_extension_tests.cpp
index ce2abdb..3638b76 100644
--- a/tests/dynamic_extension_tests.cpp
+++ b/tests/dynamic_extension_tests.cpp
@@ -19,11 +19,9 @@
#include <check.h>
using namespace de;
-typedef DynamicExtension<uint64_t, uint32_t, uint64_t> DE_WIRS;
-
START_TEST(t_create)
{
- auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng);
ck_assert_ptr_nonnull(ext_wirs);
@@ -37,12 +35,13 @@ END_TEST
START_TEST(t_append)
{
- auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng);
uint64_t key = 0;
uint32_t val = 0;
for (size_t i=0; i<100; i++) {
- ck_assert_int_eq(ext_wirs->append(key, val, 1, false, g_rng), 1);
+ WRec r = {key, val, 1};
+ ck_assert_int_eq(ext_wirs->append(r, g_rng), 1);
key++;
val++;
}
@@ -57,12 +56,13 @@ END_TEST
START_TEST(t_append_with_mem_merges)
{
- auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng);
uint64_t key = 0;
uint32_t val = 0;
for (size_t i=0; i<300; i++) {
- ck_assert_int_eq(ext_wirs->append(key, val, 1, false, g_rng), 1);
+ WRec r = {key, val, 1};
+ ck_assert_int_eq(ext_wirs->append(r, g_rng), 1);
key++;
val++;
}
@@ -77,12 +77,13 @@ END_TEST
START_TEST(t_range_sample_memtable)
{
- auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng);
uint64_t key = 0;
uint32_t val = 0;
for (size_t i=0; i<100; i++) {
- ck_assert_int_eq(ext_wirs->append(key, val, 1, false, g_rng), 1);
+ WRec r = {key, val, 1};
+ ck_assert_int_eq(ext_wirs->append(r, g_rng), 1);
key++;
val++;
}
@@ -92,7 +93,7 @@ START_TEST(t_range_sample_memtable)
char *buf = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE);
char *util_buf = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE);
- WeightedRec sample_set[100];
+ WRec sample_set[100];
ext_wirs->range_sample(sample_set, lower_bound, upper_bound, 100, g_rng);
@@ -111,12 +112,13 @@ END_TEST
START_TEST(t_range_sample_memlevels)
{
- auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng);
uint64_t key = 0;
uint32_t val = 0;
for (size_t i=0; i<300; i++) {
- ck_assert_int_eq(ext_wirs->append(key, val, 1, false, g_rng), 1);
+ WRec r = {key, val, 1};
+ ck_assert_int_eq(ext_wirs->append(r, g_rng), 1);
key++;
val++;
}
@@ -127,7 +129,7 @@ START_TEST(t_range_sample_memlevels)
char *buf = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE);
char *util_buf = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE);
- WeightedRec sample_set[100];
+ WRec sample_set[100];
ext_wirs->range_sample(sample_set, lower_bound, upper_bound, 100, g_rng);
for(size_t i=0; i<100; i++) {
@@ -144,7 +146,7 @@ END_TEST
START_TEST(t_range_sample_weighted)
{
- auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng);
size_t n = 10000;
std::vector<uint64_t> keys;
@@ -171,22 +173,23 @@ START_TEST(t_range_sample_weighted)
std::shuffle(keys.begin(), keys.end(), gen);
for (size_t i=0; i<keys.size(); i++) {
- double weight;
+ uint64_t weight;
if (keys[i] == 1) {
- weight = 2.0;
+ weight = 2;
} else if (keys[i] == 2) {
- weight = 4.0;
+ weight = 4;
} else {
- weight = 8.0;
+ weight = 8;
}
- ext_wirs->append(keys[i], i, weight, false, g_rng);
+ WRec r = {keys[i], (uint32_t) i, weight, 0};
+ ext_wirs->append(r, g_rng);
}
size_t k = 1000;
uint64_t lower_key = 0;
uint64_t upper_key = 5;
- WeightedRec* buffer = new WeightedRec[k]();
+ WRec* buffer = new WRec[k]();
char *buffer1 = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE);
char *buffer2 = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE);
@@ -214,7 +217,7 @@ END_TEST
START_TEST(t_tombstone_merging_01)
{
size_t reccnt = 100000;
- auto ext_wirs = new DE_WIRS(100, 100, 2, .01, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, .01, 1, g_rng);
std::set<std::pair<uint64_t, uint32_t>> records;
std::set<std::pair<uint64_t, uint32_t>> to_delete;
@@ -232,14 +235,17 @@ START_TEST(t_tombstone_merging_01)
size_t deletes = 0;
size_t cnt=0;
for (auto rec : records) {
- ck_assert_int_eq(ext_wirs->append(rec.first, rec.second, 1, false, g_rng), 1);
+ WRec r = {rec.first, rec.second, 1, 0};
+ ck_assert_int_eq(ext_wirs->append(r, g_rng), 1);
if (gsl_rng_uniform(g_rng) < 0.05 && !to_delete.empty()) {
std::vector<std::pair<uint64_t, uint32_t>> del_vec;
std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
for (size_t i=0; i<del_vec.size(); i++) {
- ext_wirs->append(del_vec[i].first, del_vec[i].second, 1, true, g_rng);
+ WRec dr = {del_vec[i].first, del_vec[i].second, 1};
+ dr.set_tombstone();
+ ext_wirs->append(dr, g_rng);
deletes++;
to_delete.erase(del_vec[i]);
deleted.insert(del_vec[i]);
@@ -259,12 +265,12 @@ START_TEST(t_tombstone_merging_01)
}
END_TEST
-DE_WIRS *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
- auto ext_wirs = new DE_WIRS(1000, 1000, 2, 1, 1, g_rng);
+DynamicExtension<WRec> *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
+ auto ext_wirs = new DynamicExtension<WRec>(1000, 1000, 2, 1, 1, g_rng);
- std::set<std::pair<uint64_t, uint32_t>> records;
- std::set<std::pair<uint64_t, uint32_t>> to_delete;
- std::set<std::pair<uint64_t, uint32_t>> deleted;
+ std::set<WRec> records;
+ std::set<WRec> to_delete;
+ std::set<WRec> deleted;
while (records.size() < reccnt) {
uint64_t key = rand();
@@ -277,14 +283,15 @@ DE_WIRS *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
size_t deletes = 0;
for (auto rec : records) {
- ck_assert_int_eq(ext_wirs->append(rec.first, rec.second, 1, 0, g_rng), 1);
+ ck_assert_int_eq(ext_wirs->append(rec, g_rng), 1);
if (gsl_rng_uniform(g_rng) < 0.05 && !to_delete.empty()) {
- std::vector<std::pair<uint64_t, uint32_t>> del_vec;
+ std::vector<WRec> del_vec;
std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
for (size_t i=0; i<del_vec.size(); i++) {
- ext_wirs->append(del_vec[i].first, del_vec[i].second, 1, true, g_rng);
+ del_vec[i].set_tombstone();
+ ext_wirs->append(del_vec[i], g_rng);
deletes++;
to_delete.erase(del_vec[i]);
deleted.insert(del_vec[i]);
@@ -302,7 +309,7 @@ DE_WIRS *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
START_TEST(t_sorted_array)
{
size_t reccnt = 100000;
- auto ext_wirs = new DE_WIRS(100, 100, 2, 1, 1, g_rng);
+ auto ext_wirs = new DynamicExtension<WRec>(100, 100, 2, 1, 1, g_rng);
std::set<std::pair<uint64_t, uint32_t>> records;
std::set<std::pair<uint64_t, uint32_t>> to_delete;
@@ -319,14 +326,18 @@ START_TEST(t_sorted_array)
size_t deletes = 0;
for (auto rec : records) {
- ck_assert_int_eq(ext_wirs->append(rec.first, rec.second, 1, 0, g_rng), 1);
+ WRec r = {rec.first, rec.second, 1};
+ ck_assert_int_eq(ext_wirs->append(r, g_rng), 1);
if (gsl_rng_uniform(g_rng) < 0.05 && !to_delete.empty()) {
std::vector<std::pair<uint64_t, uint32_t>> del_vec;
std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
for (size_t i=0; i<del_vec.size(); i++) {
- ext_wirs->append(del_vec[i].first, del_vec[i].second, 1, true, g_rng);
+ WRec dr = {del_vec[i].first, del_vec[i].second, 1};
+ dr.set_tombstone();
+
+ ext_wirs->append(dr , g_rng);
deletes++;
to_delete.erase(del_vec[i]);
deleted.insert(del_vec[i]);
diff --git a/tests/internal_level_tests.cpp b/tests/internal_level_tests.cpp
index 7e542e6..7842b01 100644
--- a/tests/internal_level_tests.cpp
+++ b/tests/internal_level_tests.cpp
@@ -20,19 +20,19 @@ using namespace de;
START_TEST(t_memlevel_merge)
{
- auto tbl1 = create_test_mbuffer<uint64_t, uint32_t, uint64_t>(100);
- auto tbl2 = create_test_mbuffer<uint64_t, uint32_t, uint64_t>(100);
+ auto tbl1 = create_test_mbuffer<WRec>(100);
+ auto tbl2 = create_test_mbuffer<WRec>(100);
- auto base_level = new WeightedLevel(1, 1, false);
+ auto base_level = new InternalLevel<WRec>(1, 1, false);
base_level->append_mem_table(tbl1, g_rng);
ck_assert_int_eq(base_level->get_record_cnt(), 100);
- auto merging_level = new WeightedLevel(0, 1, false);
+ auto merging_level = new InternalLevel<WRec>(0, 1, false);
merging_level->append_mem_table(tbl2, g_rng);
ck_assert_int_eq(merging_level->get_record_cnt(), 100);
auto old_level = base_level;
- base_level = WeightedLevel::merge_levels(old_level, merging_level, false, g_rng);
+ base_level = InternalLevel<WRec>::merge_levels(old_level, merging_level, false, g_rng);
delete old_level;
delete merging_level;
@@ -44,11 +44,11 @@ START_TEST(t_memlevel_merge)
}
-WeightedLevel *create_test_memlevel(size_t reccnt) {
- auto tbl1 = create_test_mbuffer<uint64_t, uint32_t, uint64_t>(reccnt/2);
- auto tbl2 = create_test_mbuffer<uint64_t, uint32_t, uint64_t>(reccnt/2);
+InternalLevel<WRec> *create_test_memlevel(size_t reccnt) {
+ auto tbl1 = create_test_mbuffer<WRec>(reccnt/2);
+ auto tbl2 = create_test_mbuffer<WRec>(reccnt/2);
- auto base_level = new WeightedLevel(1, 2, false);
+ auto base_level = new InternalLevel<WRec>(1, 2, false);
base_level->append_mem_table(tbl1, g_rng);
base_level->append_mem_table(tbl2, g_rng);
diff --git a/tests/memisam_tests.cpp b/tests/memisam_tests.cpp
index f3b80b3..1609edf 100644
--- a/tests/memisam_tests.cpp
+++ b/tests/memisam_tests.cpp
@@ -8,24 +8,25 @@
using namespace de;
-typedef MemISAM<uint64_t, uint32_t> M_ISAM;
+typedef MemISAM<Rec> M_ISAM;
START_TEST(t_memtable_init)
{
- auto buffer = new UnweightedMBuffer(1024, true, 512, g_rng);
+ auto buffer = new MutableBuffer<Rec>(1024, true, 512, g_rng);
for (uint64_t i = 512; i > 0; i--) {
- uint32_t v = i;
- buffer->append(i, v);
+ buffer->append(Rec {i, (uint32_t)i});
}
+ Rec r;
+ r.set_tombstone();
for (uint64_t i = 1; i <= 256; ++i) {
- uint32_t v = i;
- buffer->append(i, v, true);
+ r.key = i;
+ r.value = i;
+ buffer->append(r);
}
for (uint64_t i = 257; i <= 512; ++i) {
- uint32_t v = i + 1;
- buffer->append(i, v);
+ buffer->append({i, (uint32_t) i+1});
}
BloomFilter* bf = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS, g_rng);
@@ -40,9 +41,9 @@ START_TEST(t_memtable_init)
START_TEST(t_inmemrun_init)
{
size_t n = 512;
- auto memtable1 = create_test_mbuffer<uint64_t, uint32_t>(n);
- auto memtable2 = create_test_mbuffer<uint64_t, uint32_t>(n);
- auto memtable3 = create_test_mbuffer<uint64_t, uint32_t>(n);
+ auto memtable1 = create_test_mbuffer<Rec>(n);
+ auto memtable2 = create_test_mbuffer<Rec>(n);
+ auto memtable3 = create_test_mbuffer<Rec>(n);
BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
BloomFilter* bf2 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
@@ -70,11 +71,11 @@ START_TEST(t_inmemrun_init)
auto cur_rec = run4->get_record_at(i);
- if (run1_idx < n && cur_rec->match(rec1)) {
+ if (run1_idx < n && cur_rec == rec1) {
++run1_idx;
- } else if (run2_idx < n && cur_rec->match(rec2)) {
+ } else if (run2_idx < n && cur_rec == rec2) {
++run2_idx;
- } else if (run3_idx < n && cur_rec->match(rec3)) {
+ } else if (run3_idx < n && cur_rec == rec3) {
++run3_idx;
} else {
assert(false);
@@ -98,7 +99,7 @@ START_TEST(t_inmemrun_init)
START_TEST(t_get_lower_bound_index)
{
size_t n = 10000;
- auto memtable = create_double_seq_mbuffer<uint64_t, uint32_t>(n);
+ auto memtable = create_double_seq_mbuffer<Rec>(n);
ck_assert_ptr_nonnull(memtable);
BloomFilter* bf = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
@@ -109,7 +110,7 @@ START_TEST(t_get_lower_bound_index)
auto tbl_records = memtable->sorted_output();
for (size_t i=0; i<n; i++) {
- const UnweightedRec *tbl_rec = memtable->get_record_at(i);
+ const auto *tbl_rec = memtable->get_record_at(i);
auto pos = run->get_lower_bound(tbl_rec->key);
ck_assert_int_eq(run->get_record_at(pos)->key, tbl_rec->key);
ck_assert_int_le(pos, i);
@@ -123,7 +124,7 @@ START_TEST(t_get_lower_bound_index)
START_TEST(t_get_upper_bound_index)
{
size_t n = 10000;
- auto memtable = create_double_seq_mbuffer<uint64_t, uint32_t>(n);
+ auto memtable = create_double_seq_mbuffer<Rec>(n);
ck_assert_ptr_nonnull(memtable);
BloomFilter* bf = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
@@ -134,7 +135,7 @@ START_TEST(t_get_upper_bound_index)
auto tbl_records = memtable->sorted_output();
for (size_t i=0; i<n; i++) {
- const UnweightedRec *tbl_rec = memtable->get_record_at(i);
+ const auto *tbl_rec = memtable->get_record_at(i);
auto pos = run->get_upper_bound(tbl_rec->key);
ck_assert(pos == run->get_record_count() ||
run->get_record_at(pos)->key > tbl_rec->key);
@@ -150,8 +151,8 @@ START_TEST(t_get_upper_bound_index)
START_TEST(t_full_cancelation)
{
size_t n = 100;
- auto mtable = create_double_seq_mbuffer<uint64_t, uint32_t>(n, false);
- auto mtable_ts = create_double_seq_mbuffer<uint64_t, uint32_t>(n, true);
+ auto mtable = create_double_seq_mbuffer<Rec>(n, false);
+ auto mtable_ts = create_double_seq_mbuffer<Rec>(n, true);
BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
BloomFilter* bf2 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
BloomFilter* bf3 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp
index 2112036..75cbeec 100644
--- a/tests/mutable_buffer_tests.cpp
+++ b/tests/mutable_buffer_tests.cpp
@@ -20,12 +20,14 @@
#include <check.h>
+#define DE_MT_TEST 0
+
using namespace de;
START_TEST(t_create)
{
auto rng = gsl_rng_alloc(gsl_rng_mt19937);
- auto buffer = new WeightedMBuffer(100, true, 50, rng);
+ auto buffer = new MutableBuffer<Rec>(100, true, 50, rng);
ck_assert_ptr_nonnull(buffer);
ck_assert_int_eq(buffer->get_capacity(), 100);
@@ -44,30 +46,32 @@ END_TEST
START_TEST(t_insert)
{
auto rng = gsl_rng_alloc(gsl_rng_mt19937);
- auto buffer = new WeightedMBuffer(100, true, 50, rng);
+ auto buffer = new MutableBuffer<WRec>(100, true, 50, rng);
uint64_t key = 0;
uint32_t val = 5;
+ WRec rec = {0, 5, 1};
+
for (size_t i=0; i<99; i++) {
- ck_assert_int_eq(buffer->append(key, val, 1, false), 1);
- ck_assert_int_eq(buffer->check_tombstone(key, val), 0);
+ ck_assert_int_eq(buffer->append(rec), 1);
+ ck_assert_int_eq(buffer->check_tombstone(rec), 0);
- key++;
- val++;
+ rec.key++;
+ rec.value++;
ck_assert_int_eq(buffer->get_record_count(), i+1);
ck_assert_int_eq(buffer->get_tombstone_count(), 0);
ck_assert_int_eq(buffer->is_full(), 0);
}
- ck_assert_int_eq(buffer->append(key, val, 1.0, false), 1);
+ ck_assert_int_eq(buffer->append(rec), 1);
- key++;
- val++;
+ rec.key++;
+ rec.value++;
ck_assert_int_eq(buffer->is_full(), 1);
- ck_assert_int_eq(buffer->append(key, val, 1.0, false), 0);
+ ck_assert_int_eq(buffer->append(rec), 0);
delete buffer;
gsl_rng_free(rng);
@@ -79,12 +83,12 @@ END_TEST
START_TEST(t_insert_tombstones)
{
auto rng = gsl_rng_alloc(gsl_rng_mt19937);
- auto buffer = new WeightedMBuffer(100, true, 50, rng);
+ auto buffer = new MutableBuffer<Rec>(100, true, 50, rng);
- uint64_t key = 0;
- uint32_t val = 5;
size_t ts_cnt = 0;
+ Rec rec = {0, 5};
+
for (size_t i=0; i<99; i++) {
bool ts = false;
if (i % 2 == 0) {
@@ -92,11 +96,13 @@ START_TEST(t_insert_tombstones)
ts=true;
}
- ck_assert_int_eq(buffer->append(key, val, 1.0, ts), 1);
- ck_assert_int_eq(buffer->check_tombstone(key, val), ts);
+ rec.set_tombstone(ts);
- key++;
- val++;
+ ck_assert_int_eq(buffer->append(rec), 1);
+ ck_assert_int_eq(buffer->check_tombstone(rec), ts);
+
+ rec.key++;
+ rec.value++;
ck_assert_int_eq(buffer->get_record_count(), i+1);
ck_assert_int_eq(buffer->get_tombstone_count(), ts_cnt);
@@ -104,16 +110,18 @@ START_TEST(t_insert_tombstones)
}
// inserting one more tombstone should not be possible
- ck_assert_int_eq(buffer->append(key, val, 1.0, true), 0);
+ rec.set_tombstone();
+ ck_assert_int_eq(buffer->append(rec), 0);
- ck_assert_int_eq(buffer->append(key, val, 1.0, false), 1);
+ rec.set_tombstone(false);
+ ck_assert_int_eq(buffer->append(rec), 1);
- key++;
- val++;
+ rec.key++;
+ rec.value++;
ck_assert_int_eq(buffer->is_full(), 1);
- ck_assert_int_eq(buffer->append(key, val, 1.0, false), 0);
+ ck_assert_int_eq(buffer->append(rec), 0);
delete buffer;
gsl_rng_free(rng);
@@ -124,11 +132,10 @@ END_TEST
START_TEST(t_truncate)
{
auto rng = gsl_rng_alloc(gsl_rng_mt19937);
- auto buffer = new WeightedMBuffer(100, true, 100, rng);
+ auto buffer = new MutableBuffer<Rec>(100, true, 100, rng);
- uint64_t key = 0;
- uint32_t val = 5;
size_t ts_cnt = 0;
+ Rec rec = {0, 5};
for (size_t i=0; i<100; i++) {
bool ts = false;
@@ -137,25 +144,28 @@ START_TEST(t_truncate)
ts=true;
}
- ck_assert_int_eq(buffer->append(key, val, 1.0, ts), 1);
- ck_assert_int_eq(buffer->check_tombstone(key, val), ts);
+ rec.set_tombstone(ts);
- key++;
- val++;
+ ck_assert_int_eq(buffer->append(rec), 1);
+ ck_assert_int_eq(buffer->check_tombstone(rec), ts);
+
+ rec.key++;
+ rec.value++;
ck_assert_int_eq(buffer->get_record_count(), i+1);
ck_assert_int_eq(buffer->get_tombstone_count(), ts_cnt);
}
ck_assert_int_eq(buffer->is_full(), 1);
- ck_assert_int_eq(buffer->append(key, val, 1.0, false), 0);
+ rec.set_tombstone(false);
+ ck_assert_int_eq(buffer->append(rec), 0);
ck_assert_int_eq(buffer->truncate(), 1);
ck_assert_int_eq(buffer->is_full(), 0);
ck_assert_int_eq(buffer->get_record_count(), 0);
ck_assert_int_eq(buffer->get_tombstone_count(), 0);
- ck_assert_int_eq(buffer->append(key, val, 1.0, false), 1);
+ ck_assert_int_eq(buffer->append(rec), 1);
delete buffer;
gsl_rng_free(rng);
@@ -169,7 +179,7 @@ START_TEST(t_sorted_output)
size_t cnt = 100;
auto rng = gsl_rng_alloc(gsl_rng_mt19937);
- auto buffer = new WeightedMBuffer(cnt, true, cnt/2, rng);
+ auto buffer = new MutableBuffer<Rec>(cnt, true, cnt/2, rng);
std::vector<uint64_t> keys(cnt);
@@ -184,14 +194,19 @@ START_TEST(t_sorted_output)
uint32_t val = 12345;
for (size_t i=0; i<cnt-2; i++) {
- buffer->append(keys[i], val, 1.0, false);
+ buffer->append(Rec {keys[i], val});
}
- buffer->append(keys[cnt-2], val, 1.0, true);
- buffer->append(keys[cnt-1], val, 1.0, true);
+ Rec r1 = {keys[cnt-2], val};
+ r1.set_tombstone();
+ buffer->append(r1);
+
+ Rec r2 = {keys[cnt-1], val};
+ r2.set_tombstone();
+ buffer->append(r2);
- WeightedRec *sorted_records = buffer->sorted_output();
+ auto *sorted_records = buffer->sorted_output();
std::sort(keys.begin(), keys.end());
for (size_t i=0; i<cnt; i++) {
@@ -204,23 +219,24 @@ START_TEST(t_sorted_output)
END_TEST
-void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t start, size_t stop, WeightedMBuffer *buffer)
+void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t start, size_t stop, MutableBuffer<Rec> *buffer)
{
for (size_t i=start; i<stop; i++) {
- buffer->append((*values)[i].first, (*values)[i].second, 1.0);
+ buffer->append({(*values)[i].first, (*values)[i].second});
}
}
+#if DE_MT_TEST
START_TEST(t_multithreaded_insert)
{
size_t cnt = 10000;
auto rng = gsl_rng_alloc(gsl_rng_mt19937);
- auto buffer = new WeightedMBuffer(cnt, true, cnt/2, rng);
+ auto buffer = new MutableBuffer<Rec>(cnt, true, cnt/2, rng);
- std::vector<std::pair<uint64_t, uint32_t>> records(cnt);
+ std::vector<Rec> records(cnt);
for (size_t i=0; i<cnt; i++) {
- records[i] = {rand(), rand()};
+ records[i] = Rec {(uint64_t) rand(), (uint32_t) rand()};
}
// perform a t_multithreaded insertion
@@ -245,15 +261,16 @@ START_TEST(t_multithreaded_insert)
ck_assert_int_eq(buffer->get_record_count(), cnt);
std::sort(records.begin(), records.end());
- WeightedRec *sorted_records = buffer->sorted_output();
+ auto *sorted_records = buffer->sorted_output();
for (size_t i=0; i<cnt; i++) {
- ck_assert_int_eq(sorted_records[i].key, records[i].first);
+ ck_assert_int_eq(sorted_records[i].key, records[i].key);
}
delete buffer;
gsl_rng_free(rng);
}
END_TEST
+#endif
Suite *unit_testing()
@@ -268,7 +285,9 @@ Suite *unit_testing()
TCase *append = tcase_create("de::MutableBuffer::append Testing");
tcase_add_test(append, t_insert);
tcase_add_test(append, t_insert_tombstones);
- tcase_add_test(append, t_multithreaded_insert);
+ #if DE_MT_TEST
+ tcase_add_test(append, t_multithreaded_insert);
+ #endif
suite_add_tcase(unit, append);
diff --git a/tests/testing.h b/tests/testing.h
index 062e930..eb5c095 100644
--- a/tests/testing.h
+++ b/tests/testing.h
@@ -16,18 +16,14 @@
#include <unistd.h>
#include <fcntl.h>
+#include "util/Record.h"
#include "util/types.h"
#include "util/base.h"
#include "framework/MutableBuffer.h"
-#include "framework/InternalLevel.h"
+//#include "framework/InternalLevel.h"
-typedef de::Record<uint64_t, uint32_t, uint64_t> WeightedRec;
-typedef de::MutableBuffer<uint64_t, uint32_t, uint64_t> WeightedMBuffer;
-typedef de::InternalLevel<uint64_t, uint32_t, uint64_t> WeightedLevel;
-
-typedef de::Record<uint64_t, uint32_t> UnweightedRec;
-typedef de::MutableBuffer<uint64_t, uint32_t> UnweightedMBuffer;
-typedef de::InternalLevel<uint64_t, uint32_t> UnweightedLevel;
+typedef de::WeightedRecord<uint64_t, uint32_t, uint64_t> WRec;
+typedef de::Record<uint64_t, uint32_t> Rec;
static gsl_rng *g_rng = gsl_rng_alloc(gsl_rng_mt19937);
@@ -72,90 +68,101 @@ static bool roughly_equal(int n1, int n2, size_t mag, double epsilon) {
return ((double) std::abs(n1 - n2) / (double) mag) < epsilon;
}
-template <typename K, typename V, typename W=void>
-static de::MutableBuffer<K,V,W> *create_test_mbuffer(size_t cnt)
+template <de::RecordInterface R>
+static de::MutableBuffer<R> *create_test_mbuffer(size_t cnt)
{
- auto buffer = new de::MutableBuffer<K,V,W>(cnt, true, cnt, g_rng);
+ auto buffer = new de::MutableBuffer<R>(cnt, true, cnt, g_rng);
+ R rec;
for (size_t i = 0; i < cnt; i++) {
- uint64_t key = rand();
- uint32_t val = rand();
+ rec.key = rand();
+ rec.value = rand();
+
+ if constexpr (de::WeightedRecordInterface<R>) {
+ rec.weight = 1;
+ }
- buffer->append(key, val);
+ buffer->append(rec);
}
return buffer;
}
-template <typename K, typename V, typename W=void>
-static de::MutableBuffer<K,V,W> *create_test_mbuffer_tombstones(size_t cnt, size_t ts_cnt)
+template <de::RecordInterface R>
+static de::MutableBuffer<R> *create_test_mbuffer_tombstones(size_t cnt, size_t ts_cnt)
{
- auto buffer = new de::MutableBuffer<K,V,W>(cnt, true, ts_cnt, g_rng);
+ auto buffer = new de::MutableBuffer<R>(cnt, true, ts_cnt, g_rng);
std::vector<std::pair<uint64_t, uint32_t>> tombstones;
+ R rec;
for (size_t i = 0; i < cnt; i++) {
- uint64_t key = rand();
- uint32_t val = rand();
+ rec.key = rand();
+ rec.value = rand();
+
+ if constexpr (de::WeightedRecordInterface<R>) {
+ rec.weight = 1;
+ }
if (i < ts_cnt) {
- tombstones.push_back({key, val});
+ tombstones.push_back({rec.key, rec.value});
}
- buffer->append(key, val);
+ buffer->append(rec);
}
+ rec.set_tombstone();
for (size_t i=0; i<ts_cnt; i++) {
- buffer->append(tombstones[i].first, tombstones[i].second, true);
+ buffer->append(rec);
}
return buffer;
}
-template <typename K, typename V, typename W=void>
-static de::MutableBuffer<K,V,W> *create_weighted_mbuffer(size_t cnt)
+template <de::WeightedRecordInterface R>
+static de::MutableBuffer<R> *create_weighted_mbuffer(size_t cnt)
{
- static_assert(!std::is_same<W, void>::value);
- auto buffer = new de::MutableBuffer<K,V,W>(cnt, true, cnt, g_rng);
+ auto buffer = new de::MutableBuffer<R>(cnt, true, cnt, g_rng);
// Put in half of the count with weight one.
- uint64_t key = 1;
- for (size_t i=0; i< cnt / 2; i++) {
- buffer->append(key, i, 2);
+ for (uint32_t i=0; i< cnt / 2; i++) {
+ buffer->append(R {1, i, 2});
}
- // put in a quarter of the count with weight two.
- key = 2;
- for (size_t i=0; i< cnt / 4; i++) {
- buffer->append(key, i, 4);
+ // put in a quarter of the count with weight four.
+ for (uint32_t i=0; i< cnt / 4; i++) {
+ buffer->append(R {2, i, 4});
}
- // the remaining quarter with weight four.
- key = 3;
- for (size_t i=0; i< cnt / 4; i++) {
- buffer->append(key, i, 8);
+ // the remaining quarter with weight eight.
+ for (uint32_t i=0; i< cnt / 4; i++) {
+ buffer->append(R {3, i, 8});
}
return buffer;
}
-template <typename K, typename V, typename W=void>
-static de::MutableBuffer<K,V,W> *create_double_seq_mbuffer(size_t cnt, bool ts=false)
+template <de::RecordInterface R>
+static de::MutableBuffer<R> *create_double_seq_mbuffer(size_t cnt, bool ts=false)
{
- auto buffer = new de::MutableBuffer<K,V,W>(cnt, true, cnt, g_rng);
+ auto buffer = new de::MutableBuffer<R>(cnt, true, cnt, g_rng);
for (size_t i = 0; i < cnt / 2; i++) {
- uint64_t key = i;
- uint32_t val = i;
+ R rec;
+ rec.key = i;
+ rec.value = i;
+ if (ts) rec.set_tombstone();
- buffer->append(key, val, ts);
+ buffer->append(rec);
}
for (size_t i = 0; i < cnt / 2; i++) {
- uint64_t key = i;
- uint32_t val = i + 1;
+ R rec;
+ rec.key = i;
+ rec.value = i + 1;
+ if (ts) rec.set_tombstone();
- buffer->append(key, val, ts);
+ buffer->append(rec);
}
return buffer;
diff --git a/tests/wirs_tests.cpp b/tests/wirs_tests.cpp
index ed83d40..673bdca 100644
--- a/tests/wirs_tests.cpp
+++ b/tests/wirs_tests.cpp
@@ -19,24 +19,24 @@
using namespace de;
-typedef WIRS<uint64_t, uint32_t, uint64_t> Shard;
+typedef WIRS<WRec> Shard;
START_TEST(t_mbuffer_init)
{
- auto mem_table = new WeightedMBuffer(1024, true, 1024, g_rng);
+ auto mem_table = new MutableBuffer<WRec>(1024, true, 1024, g_rng);
for (uint64_t i = 512; i > 0; i--) {
uint32_t v = i;
- mem_table->append(i, v);
+ mem_table->append({i,v, 1});
}
for (uint64_t i = 1; i <= 256; ++i) {
uint32_t v = i;
- mem_table->append(i, v, 1.0, true);
+ mem_table->append({i, v, 1, 1});
}
for (uint64_t i = 257; i <= 512; ++i) {
uint32_t v = i + 1;
- mem_table->append(i, v);
+ mem_table->append({i, v, 1});
}
BloomFilter* bf = new BloomFilter(BF_FPR, mem_table->get_tombstone_count(), BF_HASH_FUNCS, g_rng);
@@ -51,9 +51,9 @@ START_TEST(t_mbuffer_init)
START_TEST(t_wirs_init)
{
size_t n = 512;
- auto mbuffer1 = create_test_mbuffer<uint64_t, uint32_t, uint64_t>(n);
- auto mbuffer2 = create_test_mbuffer<uint64_t, uint32_t, uint64_t>(n);
- auto mbuffer3 = create_test_mbuffer<uint64_t, uint32_t, uint64_t>(n);
+ auto mbuffer1 = create_test_mbuffer<WRec>(n);
+ auto mbuffer2 = create_test_mbuffer<WRec>(n);
+ auto mbuffer3 = create_test_mbuffer<WRec>(n);
BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
BloomFilter* bf2 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
@@ -81,11 +81,11 @@ START_TEST(t_wirs_init)
auto cur_rec = shard4->get_record_at(i);
- if (shard1_idx < n && cur_rec->match(rec1)) {
+ if (shard1_idx < n && *cur_rec == *rec1) {
++shard1_idx;
- } else if (shard2_idx < n && cur_rec->match(rec2)) {
+ } else if (shard2_idx < n && *cur_rec == *rec2) {
++shard2_idx;
- } else if (shard3_idx < n && cur_rec->match(rec3)) {
+ } else if (shard3_idx < n && *cur_rec == *rec3) {
++shard3_idx;
} else {
assert(false);
@@ -109,7 +109,7 @@ START_TEST(t_wirs_init)
START_TEST(t_get_lower_bound_index)
{
size_t n = 10000;
- auto mbuffer = create_double_seq_mbuffer<uint64_t, uint32_t, uint64_t>(n);
+ auto mbuffer = create_double_seq_mbuffer<WRec>(n);
ck_assert_ptr_nonnull(mbuffer);
BloomFilter* bf = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
@@ -120,7 +120,7 @@ START_TEST(t_get_lower_bound_index)
auto tbl_records = mbuffer->sorted_output();
for (size_t i=0; i<n; i++) {
- const WeightedRec *tbl_rec = mbuffer->get_record_at(i);
+ const WRec *tbl_rec = mbuffer->get_record_at(i);
auto pos = shard->get_lower_bound(tbl_rec->key);
ck_assert_int_eq(shard->get_record_at(pos)->key, tbl_rec->key);
ck_assert_int_le(pos, i);
@@ -135,8 +135,8 @@ START_TEST(t_get_lower_bound_index)
START_TEST(t_full_cancelation)
{
size_t n = 100;
- auto buffer = create_double_seq_mbuffer<uint64_t, uint32_t, uint64_t>(n, false);
- auto buffer_ts = create_double_seq_mbuffer<uint64_t, uint32_t, uint64_t>(n, true);
+ auto buffer = create_double_seq_mbuffer<WRec>(n, false);
+ auto buffer_ts = create_double_seq_mbuffer<WRec>(n, true);
BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
BloomFilter* bf2 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
BloomFilter* bf3 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
@@ -171,7 +171,7 @@ END_TEST
START_TEST(t_weighted_sampling)
{
size_t n=1000;
- auto buffer = create_weighted_mbuffer<uint64_t, uint32_t, uint64_t>(n);
+ auto buffer = create_weighted_mbuffer<WRec>(n);
BloomFilter* bf = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
Shard* shard = new Shard(buffer, bf, false);
@@ -181,7 +181,7 @@ START_TEST(t_weighted_sampling)
size_t k = 1000;
- std::vector<WeightedRec> results;
+ std::vector<WRec> results;
results.reserve(k);
size_t cnt[3] = {0};
for (size_t i=0; i<1000; i++) {
@@ -193,7 +193,7 @@ START_TEST(t_weighted_sampling)
cnt[results[j].key - 1]++;
}
- WIRS<uint64_t, uint32_t, uint64_t>::delete_state(state);
+ WIRS<WRec>::delete_state(state);
}
ck_assert(roughly_equal(cnt[0] / 1000, (double) k/4.0, k, .05));
@@ -211,14 +211,14 @@ START_TEST(t_tombstone_check)
{
size_t cnt = 1024;
size_t ts_cnt = 256;
- auto buffer = new WeightedMBuffer(cnt + ts_cnt, true, ts_cnt, g_rng);
+ auto buffer = new MutableBuffer<WRec>(cnt + ts_cnt, true, ts_cnt, g_rng);
std::vector<std::pair<uint64_t, uint32_t>> tombstones;
uint64_t key = 1000;
uint32_t val = 101;
for (size_t i = 0; i < cnt; i++) {
- buffer->append(key, val);
+ buffer->append({key, val, 1});
key++;
val++;
}
@@ -230,14 +230,14 @@ START_TEST(t_tombstone_check)
}
for (size_t i=0; i<ts_cnt; i++) {
- buffer->append(tombstones[i].first, tombstones[i].second, 1.0, true);
+ buffer->append({tombstones[i].first, tombstones[i].second, 1, 1});
}
BloomFilter* bf1 = new BloomFilter(100, BF_HASH_FUNCS, g_rng);
auto shard = new Shard(buffer, bf1, false);
for (size_t i=0; i<tombstones.size(); i++) {
- ck_assert(shard->check_tombstone(tombstones[i].first, tombstones[i].second));
+ ck_assert(shard->check_tombstone({tombstones[i].first, tombstones[i].second}));
ck_assert_int_eq(shard->get_rejection_count(), i+1);
}