summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-05-29 14:30:08 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-05-29 14:30:08 -0400
commite920fa57cf9c503e560055864e4de37912b239e1 (patch)
treed0daba733c7af2b0b22f29d39de8f824ffa83472
parentb00682429988f17152e7573ffeffa1cecfdd3d3a (diff)
downloaddynamic-extension-e920fa57cf9c503e560055864e4de37912b239e1.tar.gz
Adjusted the way that Wrapping records works to clean up interfaces
-rw-r--r--include/framework/DynamicExtension.h51
-rw-r--r--include/framework/InternalLevel.h10
-rw-r--r--include/framework/MutableBuffer.h20
-rw-r--r--include/framework/RecordInterface.h48
-rw-r--r--include/shard/WIRS.h90
-rw-r--r--tests/dynamic_extension_tests.cpp36
-rw-r--r--tests/internal_level_tests.cpp10
-rw-r--r--tests/mutable_buffer_tests.cpp58
-rw-r--r--tests/testing.h8
-rw-r--r--tests/wirs_tests.cpp30
10 files changed, 172 insertions, 189 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 68f85e2..422ca10 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -75,9 +75,9 @@ typedef ssize_t level_index;
template <RecordInterface R, ShardInterface S, QueryInterface Q, typename FQ=void>
class DynamicExtension {
- //typedef typename S<WrappedRecord<R>> Shard;
+ //typedef typename S<R> Shard;
typedef S Shard;
- typedef MutableBuffer<WrappedRecord<R>> Buffer;
+ typedef MutableBuffer<R> Buffer;
public:
DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop)
@@ -142,18 +142,23 @@ public:
level->get_query_states(shards, states, parms);
}
- std::vector<std::vector<R>> query_results(shards.size() + 1);
+ std::vector<std::vector<R>*> query_results(shards.size() + 1, nullptr);
// Execute the query for the buffer
- query_results[0] = Q::buffer_query(buffer, buffer_state, parms);
+ query_results[0] = filter_deletes(Q::buffer_query(buffer, buffer_state, parms), {-1, -1}, buffer);
// Execute the query for each shard
for (size_t i=0; i<shards.size(); i++) {
- query_results[i] = filter_deletes(Q::query(shards[i].second, states[i], parms));
+ query_results[i] = filter_deletes(Q::query(shards[i].second, states[i], parms), shards[i].first, buffer);
}
// Merge the results together
- auto result = Q::merge(query_results);
+ auto result = Q::merge(&query_results);
+
+ for (size_t i=0; i<query_results.size(); i++) {
+ delete query_results[i];
+ Q::delete_query_state(states[i]);
+ }
return result;
}
@@ -259,7 +264,7 @@ private:
size_t m_scale_factor;
double m_max_delete_prop;
- std::vector<InternalLevel<WrappedRecord<R>, S, Q> *> m_levels;
+ std::vector<InternalLevel<R, S, Q> *> m_levels;
Buffer *get_buffer() {
return m_buffer;
@@ -274,14 +279,12 @@ private:
merge_buffer();
}
- WrappedRecord<R> wrec = static_cast<WrappedRecord<R>>(rec);
-
- return buffer->append(wrec, ts);
+ return buffer->append(rec, ts);
}
- std::vector<R> filter_deletes(std::vector<R> records, ShardID shid, Buffer *buffer) {
- std::vector<R> processed_records;
- processed_records.reserve(records.size());
+ std::vector<R> *filter_deletes(std::vector<R> *records, ShardID shid, Buffer *buffer) {
+ std::vector<R> *processed_records = new std::vector<R>();
+ processed_records->reserve(records->size());
// For delete tagging, we just need to check the delete bit on each
// record.
@@ -291,9 +294,10 @@ private:
continue;
}
- processed_records.emplace_back(rec.rec);
+ processed_records->emplace_back(static_cast<R>(rec.rec));
}
+ delete records;
return processed_records;
}
@@ -320,8 +324,10 @@ private:
}
}
- processed_records.emplace_back(rec.rec);
+ processed_records->emplace_back(static_cast<R>(rec.rec));
}
+
+ delete records;
}
/*
@@ -337,7 +343,7 @@ private:
if (new_idx > 0) {
assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
}
- m_levels.emplace_back(new InternalLevel<WrappedRecord<R>, Shard, Q>(new_idx, new_shard_cnt));
+ m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt));
return new_idx;
}
@@ -416,14 +422,14 @@ private:
// merging two memory levels
if (LSM_LEVELING) {
auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
+ m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
mark_as_unused(tmp);
} else {
m_levels[base_level]->append_merged_shards(m_levels[incoming_level]);
}
mark_as_unused(m_levels[incoming_level]);
- m_levels[incoming_level] = new InternalLevel<WrappedRecord<R>, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
+ m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
}
@@ -432,9 +438,9 @@ private:
if (LSM_LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0];
- auto temp_level = new InternalLevel<WrappedRecord<R>, Shard, Q>(0, 1);
+ auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
temp_level->append_buffer(buffer);
- auto new_level = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(old_level, temp_level);
+ auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level);
m_levels[0] = new_level;
delete temp_level;
@@ -450,7 +456,7 @@ private:
* level may not be able to immediately be deleted, depending upon who
* else is using it.
*/
- inline void mark_as_unused(InternalLevel<WrappedRecord<R>, Shard, Q> *level) {
+ inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) {
delete level;
}
@@ -460,6 +466,9 @@ private:
* levels below idx are below the limit.
*/
inline void enforce_delete_maximum(level_index idx) {
+ // FIXME: currently broken due to tombstone cancellation issues
+ return;
+
long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx);
if (ts_prop > (long double) m_max_delete_prop) {
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index c489063..d28ba5f 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -28,18 +28,18 @@ class InternalLevel {
typedef MutableBuffer<R> Buffer;
public:
InternalLevel(ssize_t level_no, size_t shard_cap)
- : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<Shard*>(shard_cap, nullptr))
+ : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<Shard*>(shard_cap, nullptr), free_shards)
{}
// Create a new memory level sharing the shards and repurposing it as previous level_no + 1
// WARNING: for leveling only.
InternalLevel(InternalLevel* level)
: m_level_no(level->m_level_no + 1), m_shard_cnt(level->m_shard_cnt)
- , m_shards(level->m_shards) {
+ , m_shards(level->m_shards, free_shards) {
assert(m_shard_cnt == 1 && m_shards->size() == 1);
}
- ~InternalLevel() {}
+ ~InternalLevel() { }
// WARNING: for leveling only.
// assuming the base level is the level new level is merging into. (base_level is larger.)
@@ -181,6 +181,10 @@ private:
size_t m_shard_cnt;
size_t m_shard_size_cap;
+ static void free_shards(std::vector<Shard*>* vec) {
+ for (size_t i=0; i<vec->size(); i++) delete (*vec)[i];
+ }
+
std::shared_ptr<std::vector<Shard*>> m_shards;
};
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h
index c154001..bc80922 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/MutableBuffer.h
@@ -28,14 +28,13 @@ namespace de {
template <RecordInterface R>
class MutableBuffer {
- //typedef WrappedRecord<R> R;
public:
MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap)
: m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
, m_tombstonecnt(0), m_weight(0), m_max_weight(0) {
- auto len = capacity * sizeof(R);
+ auto len = capacity * sizeof(Wrapped<R>);
size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE));
- m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
+ m_data = (Wrapped<R>*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
m_tombstone_filter = nullptr;
if (max_tombstone_cap > 0) {
m_tombstone_filter = new BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS);
@@ -54,10 +53,11 @@ public:
int32_t pos = 0;
if ((pos = try_advance_tail()) == -1) return 0;
- R new_rec = rec;
- if (tombstone) new_rec.set_tombstone();
+ Wrapped<R> wrec;
+ wrec.rec = rec;
+ if (tombstone) wrec.set_tombstone();
- m_data[pos] = new_rec;
+ m_data[pos] = wrec;
m_data[pos].header |= (pos << 2);
if (tombstone) {
@@ -66,7 +66,7 @@ public:
}
if constexpr (WeightedRecordInterface<R_>) {
- m_weight.fetch_add(new_rec.weight);
+ m_weight.fetch_add(rec.weight);
double old = m_max_weight.load();
while (old < rec.weight) {
m_max_weight.compare_exchange_strong(old, rec.weight);
@@ -123,7 +123,7 @@ public:
auto offset = 0;
while (offset < m_reccnt.load()) {
- if (m_data[offset] == rec && m_data[offset].is_tombstone()) {
+ if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) {
return true;
}
offset++;;
@@ -147,7 +147,7 @@ public:
return m_weight.load();
}
- R *get_data() {
+ Wrapped<R> *get_data() {
return m_data;
}
@@ -162,7 +162,7 @@ private:
size_t m_cap;
size_t m_tombstone_cap;
- R* m_data;
+ Wrapped<R>* m_data;
BloomFilter<R>* m_tombstone_filter;
alignas(64) std::atomic<size_t> m_tombstonecnt;
diff --git a/include/framework/RecordInterface.h b/include/framework/RecordInterface.h
index c8e622f..70c8e01 100644
--- a/include/framework/RecordInterface.h
+++ b/include/framework/RecordInterface.h
@@ -26,9 +26,9 @@ concept RecordInterface = requires(R r, R s) {
};
template<RecordInterface R>
-struct WrappedRecord : R {
- //R rec;
+struct Wrapped {
uint32_t header;
+ R rec;
inline void set_delete() {
header |= 2;
@@ -49,8 +49,14 @@ struct WrappedRecord : R {
inline bool is_tombstone() const {
return header & 1;
}
+
+ inline bool operator<(const Wrapped& other) const {
+ return (rec.key < other.rec.key) || (rec.key == other.rec.key && rec.value < other.rec.value)
+ || (rec.key == other.rec.key && rec.value == other.rec.value && header < other.header);
+ }
};
+
template <typename R>
concept WeightedRecordInterface = RecordInterface<R> && requires(R r) {
{r.weight} -> std::convertible_to<double>;
@@ -76,46 +82,14 @@ struct WeightedRecord {
K key;
V value;
W weight = 1;
- uint32_t header = 0;
- inline void set_delete() {
- header |= 2;
- }
-
- inline bool is_deleted() const {
- return header & 2;
- }
-
- inline void set_tombstone(bool val=true) {
- if (val) {
- header |= val;
- } else {
- header &= 0;
- }
- }
-
- inline bool is_tombstone() const {
- return header & 1;
- }
-
- inline int match(const WeightedRecord* other) const {
- return key == other->key && value == other->value;
+ inline bool operator==(const WeightedRecord& other) const {
+ return key == other.key && value == other.value;
}
- inline bool operator<(const WeightedRecord& other) const {
+ inline bool operator<(const WeightedRecord& other) const {
return key < other.key || (key == other.key && value < other.value);
}
-
- inline bool operator==(const WeightedRecord& other) const {
- return key == other.key && value == other.value;
- }
};
-
-template <RecordInterface R>
-static bool memtable_record_cmp(const R& a, const R& b) {
- return (a.key < b.key) || (a.key == b.key && a.value < b.value)
- || (a.key == b.key && a.value == b.value && a.header < b.header);
-}
-
}
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h
index 5b610c7..9e4d911 100644
--- a/include/shard/WIRS.h
+++ b/include/shard/WIRS.h
@@ -34,6 +34,7 @@ template <WeightedRecordInterface R>
struct wirs_query_parms {
decltype(R::key) lower_bound;
decltype(R::key) upper_bound;
+ size_t sample_size;
};
template <WeightedRecordInterface R>
@@ -73,9 +74,9 @@ public:
WIRS(MutableBuffer<R>* buffer)
: m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) {
- size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE);
+ size_t alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
m_bf = new BloomFilter<K>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS);
@@ -84,11 +85,11 @@ public:
auto base = buffer->get_data();
auto stop = base + buffer->get_record_count();
- std::sort(base, stop, memtable_record_cmp<R>);
+ std::sort(base, stop, std::less<Wrapped<R>>());
while (base < stop) {
if (!(base->is_tombstone()) && (base + 1) < stop) {
- if (*base == *(base + 1) && (base + 1)->is_tombstone()) {
+ if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
base += 2;
wirs_cancelations++;
continue;
@@ -100,11 +101,11 @@ public:
base->header &= 1;
m_data[m_reccnt++] = *base;
- m_total_weight+= base->weight;
+ m_total_weight+= base->rec.weight;
if (m_bf && base->is_tombstone()) {
m_tombstone_cnt++;
- m_bf->insert(base->key);
+ m_bf->insert(base->rec.key);
}
base++;
@@ -117,56 +118,56 @@ public:
WIRS(WIRS** shards, size_t len)
: m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) {
- std::vector<Cursor<R>> cursors;
+ std::vector<Cursor<Wrapped<R>>> cursors;
cursors.reserve(len);
- PriorityQueue<R> pq(len);
+ PriorityQueue<Wrapped<R>> pq(len);
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
for (size_t i = 0; i < len; ++i) {
if (shards[i]) {
- auto base = shards[i]->sorted_output();
+ auto base = shards[i]->get_data();
cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
attemp_reccnt += shards[i]->get_record_count();
tombstone_count += shards[i]->get_tombstone_count();
pq.push(cursors[i].ptr, i);
} else {
- cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0});
+ cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
}
}
m_bf = new BloomFilter<K>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
- size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE);
+ size_t alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
while (pq.size()) {
auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<R>{nullptr, 0};
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
if (!now.data->is_tombstone() && next.data != nullptr &&
- *now.data == *next.data && next.data->is_tombstone()) {
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
pq.pop(); pq.pop();
auto& cursor1 = cursors[now.version];
auto& cursor2 = cursors[next.version];
- if (advance_cursor<R>(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor<R>(cursor2)) pq.push(cursor2.ptr, next.version);
+ if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version);
+ if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version);
} else {
auto& cursor = cursors[now.version];
if (!cursor.ptr->is_deleted()) {
m_data[m_reccnt++] = *cursor.ptr;
- m_total_weight += cursor.ptr->weight;
+ m_total_weight += cursor.ptr->rec.weight;
if (m_bf && cursor.ptr->is_tombstone()) {
++m_tombstone_cnt;
- if (m_bf) m_bf->insert(cursor.ptr->key);
+ if (m_bf) m_bf->insert(cursor.ptr->rec.key);
}
}
pq.pop();
- if (advance_cursor<R>(cursor)) pq.push(cursor.ptr, now.version);
+ if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version);
}
}
@@ -186,7 +187,7 @@ public:
free_tree(m_root);
}
- R *point_lookup(R &rec, bool filter=false) {
+ Wrapped<R> *point_lookup(R &rec, bool filter=false) {
if (filter && !m_bf->lookup(rec.key)) {
return nullptr;
}
@@ -205,7 +206,7 @@ public:
return nullptr;
}
- R* sorted_output() const {
+ Wrapped<R>* get_data() const {
return m_data;
}
@@ -217,7 +218,7 @@ public:
return m_tombstone_cnt;
}
- const R* get_record_at(size_t idx) const {
+ const Wrapped<R>* get_record_at(size_t idx) const {
if (idx >= m_reccnt) return nullptr;
return m_data + idx;
}
@@ -273,7 +274,7 @@ private:
double group_weight = 0.0;
group_norm_weight.clear();
for (size_t k = 0; k < m_group_size && i < m_reccnt; ++k, ++i) {
- auto w = m_data[i].weight;
+ auto w = m_data[i].rec.weight;
group_norm_weight.emplace_back(w);
group_weight += w;
sum_weight += w;
@@ -325,7 +326,7 @@ private:
}
}
- R* m_data;
+ Wrapped<R>* m_data;
std::vector<Alias *> m_alias;
wirs_node<R>* m_root;
W m_total_weight;
@@ -339,10 +340,10 @@ private:
template <WeightedRecordInterface R>
class WIRSQuery {
public:
- static void *get_query_state(wirs_query_parms<R> *parameters, WIRS<R> *wirs) {
+ static void *get_query_state(WIRS<R> *wirs, void *parms) {
auto res = new WIRSState<R>();
- decltype(R::key) lower_key = ((wirs_query_parms<R> *) parameters)->lower_bound;
- decltype(R::key) upper_key = ((wirs_query_parms<R> *) parameters)->upper_bound;
+ decltype(R::key) lower_key = ((wirs_query_parms<R> *) parms)->lower_bound;
+ decltype(R::key) upper_key = ((wirs_query_parms<R> *) parms)->upper_bound;
// Simulate a stack to unfold recursion.
double tot_weight = 0.0;
@@ -351,13 +352,13 @@ public:
size_t top = 1;
while(top > 0) {
auto now = st[--top];
- if (covered_by(now, lower_key, upper_key) ||
- (now->left == nullptr && now->right == nullptr && intersects(now, lower_key, upper_key))) {
+ if (wirs->covered_by(now, lower_key, upper_key) ||
+ (now->left == nullptr && now->right == nullptr && wirs->intersects(now, lower_key, upper_key))) {
res->nodes.emplace_back(now);
tot_weight += now->weight;
} else {
- if (now->left && intersects(now->left, lower_key, upper_key)) st[top++] = now->left;
- if (now->right && intersects(now->right, lower_key, upper_key)) st[top++] = now->right;
+ if (now->left && wirs->intersects(now->left, lower_key, upper_key)) st[top++] = now->left;
+ if (now->right && wirs->intersects(now->right, lower_key, upper_key)) st[top++] = now->right;
}
}
@@ -371,13 +372,19 @@ public:
return res;
}
- static std::vector<R> *query(wirs_query_parms<R> *parameters, WIRSState<R> *state, WIRS<R> *wirs) {
- auto sample_sz = parameters->sample_size;
- auto lower_key = parameters->lower_bound;
- auto upper_key = parameters->upper_bound;
- auto rng = parameters->rng;
+ static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) {
+ return nullptr;
+ }
+
+ static std::vector<Wrapped<R>> *query(WIRS<R> *wirs, void *q_state, void *parms) {
+ auto sample_sz = ((wirs_query_parms<R> *) parms)->sample_size;
+ auto lower_key = ((wirs_query_parms<R> *) parms)->lower_bound;
+ auto upper_key = ((wirs_query_parms<R> *) parms)->upper_bound;
+ auto rng = ((wirs_query_parms<R> *) parms)->rng;
+
+ auto state = (WIRSState<R> *) q_state;
- std::vector<R> *result_set = new std::vector<R>();
+ std::vector<Wrapped<R>> *result_set = new std::vector<Wrapped<R>>();
if (sample_sz == 0) {
return 0;
@@ -407,14 +414,19 @@ public:
return result_set;
}
- static std::vector<R> *merge(std::vector<std::vector<R>> *results) {
- std::vector<R> *output = new std::vector<R>();
+ static std::vector<Wrapped<R>> *buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) {
+ return new std::vector<Wrapped<R>>();
+ }
+
+ static std::vector<R> merge(std::vector<std::vector<R>*> *results) {
+ std::vector<R> output;
for (size_t i=0; i<results->size(); i++) {
for (size_t j=0; j<(*results)[i]->size(); j++) {
output->emplace_back(*((*results)[i])[j]);
}
}
+
return output;
}
diff --git a/tests/dynamic_extension_tests.cpp b/tests/dynamic_extension_tests.cpp
index b74ab38..9fdd5f9 100644
--- a/tests/dynamic_extension_tests.cpp
+++ b/tests/dynamic_extension_tests.cpp
@@ -20,7 +20,7 @@
#include <check.h>
using namespace de;
-typedef DynamicExtension<WRec, WIRS<WrappedRecord<WRec>>, WIRSQuery<WrappedRecord<WRec>>> DE;
+typedef DynamicExtension<WRec, WIRS<WRec>, WIRSQuery<WRec>> DE;
START_TEST(t_create)
{
@@ -147,7 +147,9 @@ START_TEST(t_range_sample_memlevels)
delete ext_wirs;
}
END_TEST
+*/
+/*
START_TEST(t_range_sample_weighted)
{
auto ext_wirs = new DE(100, 2, 1);
@@ -186,23 +188,24 @@ START_TEST(t_range_sample_weighted)
weight = 8;
}
- WRec r = {keys[i], (uint32_t) i, weight, 0};
+ WRec r = {keys[i], (uint32_t) i, weight};
ext_wirs->insert(r);
}
size_t k = 1000;
uint64_t lower_key = 0;
uint64_t upper_key = 5;
- WRec* buffer = new WRec[k]();
- char *buffer1 = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE);
- char *buffer2 = (char *) std::aligned_alloc(SECTOR_SIZE, PAGE_SIZE);
-
size_t cnt[3] = {0};
for (size_t i=0; i<1000; i++) {
- ext_wirs->range_sample(buffer, lower_key, upper_key, k);
+ wirs_query_parms<WRec> p;
+ p.lower_bound = lower_key;
+ p.upper_bound = upper_key;
+ p.sample_size = k;
+
+ auto result = ext_wirs->query(&p);
for (size_t j=0; j<k; j++) {
- cnt[buffer[j].key - 1]++;
+ cnt[result[j].key - 1]++;
}
}
@@ -211,9 +214,6 @@ START_TEST(t_range_sample_weighted)
ck_assert(roughly_equal(cnt[2] / 1000, (double) k/2.0, k, .05));
delete ext_wirs;
- delete[] buffer;
- free(buffer1);
- free(buffer2);
}
END_TEST
*/
@@ -242,7 +242,7 @@ START_TEST(t_tombstone_merging_01)
size_t deletes = 0;
size_t cnt=0;
for (auto rec : records) {
- WRec r = {rec.first, rec.second, 1, 0};
+ WRec r = {rec.first, rec.second, 1};
ck_assert_int_eq(ext_wirs->insert(r), 1);
if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) {
@@ -251,8 +251,7 @@ START_TEST(t_tombstone_merging_01)
for (size_t i=0; i<del_vec.size(); i++) {
WRec dr = {del_vec[i].first, del_vec[i].second, 1};
- dr.set_tombstone();
- ext_wirs->insert(dr);
+ ext_wirs->erase(dr);
deletes++;
to_delete.erase(del_vec[i]);
deleted.insert(del_vec[i]);
@@ -300,8 +299,7 @@ DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
for (size_t i=0; i<del_vec.size(); i++) {
- del_vec[i].set_tombstone();
- ext_wirs->insert(del_vec[i]);
+ ext_wirs->erase(del_vec[i]);
deletes++;
to_delete.erase(del_vec[i]);
deleted.insert(del_vec[i]);
@@ -349,9 +347,7 @@ START_TEST(t_sorted_array)
for (size_t i=0; i<del_vec.size(); i++) {
WRec dr = {del_vec[i].first, del_vec[i].second, 1};
- dr.set_tombstone();
-
- ext_wirs->insert(dr );
+ ext_wirs->erase(dr);
deletes++;
to_delete.erase(del_vec[i]);
deleted.insert(del_vec[i]);
@@ -368,7 +364,7 @@ START_TEST(t_sorted_array)
uint64_t prev_key = 0;
for (size_t i=0; i<flat->get_record_count(); i++) {
- auto k = flat->get_record_at(i)->key;
+ auto k = flat->get_record_at(i)->rec.key;
ck_assert_int_ge(k, prev_key);
prev_key = k;
}
diff --git a/tests/internal_level_tests.cpp b/tests/internal_level_tests.cpp
index 42c9044..1bbc458 100644
--- a/tests/internal_level_tests.cpp
+++ b/tests/internal_level_tests.cpp
@@ -21,12 +21,12 @@
using namespace de;
-typedef InternalLevel<WrappedRecord<WRec>, WIRS<WrappedRecord<WRec>>, WIRSQuery<WrappedRecord<WRec>>> ILevel;
+typedef InternalLevel<WRec, WIRS<WRec>, WIRSQuery<WRec>> ILevel;
START_TEST(t_memlevel_merge)
{
- auto tbl1 = create_test_mbuffer<WrappedWRec>(100);
- auto tbl2 = create_test_mbuffer<WrappedWRec>(100);
+ auto tbl1 = create_test_mbuffer<WRec>(100);
+ auto tbl2 = create_test_mbuffer<WRec>(100);
auto base_level = new ILevel(1, 1);
base_level->append_buffer(tbl1);
@@ -50,8 +50,8 @@ START_TEST(t_memlevel_merge)
ILevel *create_test_memlevel(size_t reccnt) {
- auto tbl1 = create_test_mbuffer<WrappedWRec>(reccnt/2);
- auto tbl2 = create_test_mbuffer<WrappedWRec>(reccnt/2);
+ auto tbl1 = create_test_mbuffer<WRec>(reccnt/2);
+ auto tbl2 = create_test_mbuffer<WRec>(reccnt/2);
auto base_level = new ILevel(1, 2);
base_level->append_buffer(tbl1);
diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp
index fc8b511..bb8e2c2 100644
--- a/tests/mutable_buffer_tests.cpp
+++ b/tests/mutable_buffer_tests.cpp
@@ -25,7 +25,7 @@ using namespace de;
START_TEST(t_create)
{
- auto buffer = new MutableBuffer<WrappedRec>(100, true, 50);
+ auto buffer = new MutableBuffer<Rec>(100, true, 50);
ck_assert_ptr_nonnull(buffer);
ck_assert_int_eq(buffer->get_capacity(), 100);
@@ -42,12 +42,12 @@ END_TEST
START_TEST(t_insert)
{
- auto buffer = new MutableBuffer<WrappedWRec>(100, true, 50);
+ auto buffer = new MutableBuffer<WRec>(100, true, 50);
uint64_t key = 0;
uint32_t val = 5;
- WrappedWRec rec = {0, 5, 1};
+ WRec rec = {0, 5, 1};
for (size_t i=0; i<99; i++) {
ck_assert_int_eq(buffer->append(rec), 1);
@@ -77,11 +77,11 @@ END_TEST
START_TEST(t_insert_tombstones)
{
- auto buffer = new MutableBuffer<WrappedRec>(100, true, 50);
+ auto buffer = new MutableBuffer<Rec>(100, true, 50);
size_t ts_cnt = 0;
- WrappedRec rec = {0, 5};
+ Rec rec = {0, 5};
for (size_t i=0; i<99; i++) {
bool ts = false;
@@ -90,9 +90,7 @@ START_TEST(t_insert_tombstones)
ts=true;
}
- rec.set_tombstone(ts);
-
- ck_assert_int_eq(buffer->append(rec), 1);
+ ck_assert_int_eq(buffer->append(rec, ts), 1);
ck_assert_int_eq(buffer->check_tombstone(rec), ts);
rec.key++;
@@ -104,11 +102,9 @@ START_TEST(t_insert_tombstones)
}
// inserting one more tombstone should not be possible
- rec.set_tombstone();
- ck_assert_int_eq(buffer->append(rec), 0);
+ ck_assert_int_eq(buffer->append(rec, true), 0);
- rec.set_tombstone(false);
ck_assert_int_eq(buffer->append(rec), 1);
rec.key++;
@@ -124,10 +120,10 @@ END_TEST
START_TEST(t_truncate)
{
- auto buffer = new MutableBuffer<WrappedRec>(100, true, 100);
+ auto buffer = new MutableBuffer<Rec>(100, true, 100);
size_t ts_cnt = 0;
- WrappedRec rec = {0, 5};
+ Rec rec = {0, 5};
for (size_t i=0; i<100; i++) {
bool ts = false;
@@ -136,9 +132,7 @@ START_TEST(t_truncate)
ts=true;
}
- rec.set_tombstone(ts);
-
- ck_assert_int_eq(buffer->append(rec), 1);
+ ck_assert_int_eq(buffer->append(rec, ts), 1);
ck_assert_int_eq(buffer->check_tombstone(rec), ts);
rec.key++;
@@ -149,7 +143,6 @@ START_TEST(t_truncate)
}
ck_assert_int_eq(buffer->is_full(), 1);
- rec.set_tombstone(false);
ck_assert_int_eq(buffer->append(rec), 0);
ck_assert_int_eq(buffer->truncate(), 1);
@@ -165,11 +158,11 @@ START_TEST(t_truncate)
END_TEST
-START_TEST(t_sorted_output)
+START_TEST(t_get_data)
{
size_t cnt = 100;
- auto buffer = new MutableBuffer<WrappedRec>(cnt, true, cnt/2);
+ auto buffer = new MutableBuffer<Rec>(cnt, true, cnt/2);
std::vector<uint64_t> keys(cnt);
@@ -184,23 +177,22 @@ START_TEST(t_sorted_output)
uint32_t val = 12345;
for (size_t i=0; i<cnt-2; i++) {
- buffer->append(WrappedRec {keys[i], val});
+ buffer->append(Rec {keys[i], val});
}
- WrappedRec r1 = {keys[cnt-2], val};
- r1.set_tombstone();
- buffer->append(r1);
+ Rec r1 = {keys[cnt-2], val};
+ buffer->append(r1, true);
- WrappedRec r2 = {keys[cnt-1], val};
- r2.set_tombstone();
- buffer->append(r2);
+ Rec r2 = {keys[cnt-1], val};
+ buffer->append(r2, true);
auto *sorted_records = buffer->get_data();
std::sort(keys.begin(), keys.end());
+ std::sort(sorted_records, sorted_records + buffer->get_record_count(), std::less<Wrapped<Rec>>());
for (size_t i=0; i<cnt; i++) {
- ck_assert_int_eq(sorted_records[i].key, keys[i]);
+ ck_assert_int_eq(sorted_records[i].rec.key, keys[i]);
}
delete buffer;
@@ -208,7 +200,7 @@ START_TEST(t_sorted_output)
END_TEST
-void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t start, size_t stop, MutableBuffer<WrappedRec> *buffer)
+void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t start, size_t stop, MutableBuffer<Rec> *buffer)
{
for (size_t i=start; i<stop; i++) {
buffer->append({(*values)[i].first, (*values)[i].second});
@@ -220,11 +212,11 @@ void insert_records(std::vector<std::pair<uint64_t, uint32_t>> *values, size_t s
START_TEST(t_multithreaded_insert)
{
size_t cnt = 10000;
- auto buffer = new MutableBuffer<WrappedRec>(cnt, true, cnt/2);
+ auto buffer = new MutableBuffer<Rec>(cnt, true, cnt/2);
- std::vector<WrappedRec> records(cnt);
+ std::vector<Rec> records(cnt);
for (size_t i=0; i<cnt; i++) {
- records[i] = WrappedRec {(uint64_t) rand(), (uint32_t) rand()};
+ records[i] = Rec {(uint64_t) rand(), (uint32_t) rand()};
}
// perform a t_multithreaded insertion
@@ -285,8 +277,8 @@ Suite *unit_testing()
suite_add_tcase(unit, truncate);
- TCase *sorted_out = tcase_create("de::MutableBuffer::sorted_output");
- tcase_add_test(sorted_out, t_sorted_output);
+ TCase *sorted_out = tcase_create("de::MutableBuffer::get_data");
+ tcase_add_test(sorted_out, t_get_data);
suite_add_tcase(unit, sorted_out);
diff --git a/tests/testing.h b/tests/testing.h
index 3ffafcd..cd0045a 100644
--- a/tests/testing.h
+++ b/tests/testing.h
@@ -22,8 +22,6 @@
typedef de::WeightedRecord<uint64_t, uint32_t, uint64_t> WRec;
typedef de::Record<uint64_t, uint32_t> Rec;
-typedef de::WrappedRecord<WRec> WrappedWRec;
-typedef de::WrappedRecord<Rec> WrappedRec;
static bool initialize_test_file(std::string fname, size_t page_cnt)
{
@@ -149,18 +147,16 @@ static de::MutableBuffer<R> *create_double_seq_mbuffer(size_t cnt, bool ts=false
R rec;
rec.key = i;
rec.value = i;
- if (ts) rec.set_tombstone();
- buffer->append(rec);
+ buffer->append(rec, ts);
}
for (size_t i = 0; i < cnt / 2; i++) {
R rec;
rec.key = i;
rec.value = i + 1;
- if (ts) rec.set_tombstone();
- buffer->append(rec);
+ buffer->append(rec, ts);
}
return buffer;
diff --git a/tests/wirs_tests.cpp b/tests/wirs_tests.cpp
index 3a80a9e..ba4b754 100644
--- a/tests/wirs_tests.cpp
+++ b/tests/wirs_tests.cpp
@@ -17,39 +17,39 @@
using namespace de;
-typedef WIRS<WrappedWRec> Shard;
+typedef WIRS<WRec> Shard;
START_TEST(t_mbuffer_init)
{
- auto mem_table = new MutableBuffer<WrappedWRec>(1024, true, 1024);
+ auto buffer = new MutableBuffer<WRec>(1024, true, 1024);
for (uint64_t i = 512; i > 0; i--) {
uint32_t v = i;
- mem_table->append({i,v, 1});
+ buffer->append({i,v, 1});
}
for (uint64_t i = 1; i <= 256; ++i) {
uint32_t v = i;
- mem_table->append({i, v, 1, 1});
+ buffer->append({i, v, 1}, true);
}
for (uint64_t i = 257; i <= 512; ++i) {
uint32_t v = i + 1;
- mem_table->append({i, v, 1});
+ buffer->append({i, v, 1});
}
- Shard* shard = new Shard(mem_table);
+ Shard* shard = new Shard(buffer);
ck_assert_uint_eq(shard->get_record_count(), 512);
- delete mem_table;
+ delete buffer;
delete shard;
}
START_TEST(t_wirs_init)
{
size_t n = 512;
- auto mbuffer1 = create_test_mbuffer<WrappedWRec>(n);
- auto mbuffer2 = create_test_mbuffer<WrappedWRec>(n);
- auto mbuffer3 = create_test_mbuffer<WrappedWRec>(n);
+ auto mbuffer1 = create_test_mbuffer<WRec>(n);
+ auto mbuffer2 = create_test_mbuffer<WRec>(n);
+ auto mbuffer3 = create_test_mbuffer<WRec>(n);
auto shard1 = new Shard(mbuffer1);
auto shard2 = new Shard(mbuffer2);
@@ -73,11 +73,11 @@ START_TEST(t_wirs_init)
auto cur_rec = shard4->get_record_at(i);
- if (shard1_idx < n && *cur_rec == *rec1) {
+ if (shard1_idx < n && cur_rec->rec == rec1->rec) {
++shard1_idx;
- } else if (shard2_idx < n && *cur_rec == *rec2) {
+ } else if (shard2_idx < n && cur_rec->rec == rec2->rec) {
++shard2_idx;
- } else if (shard3_idx < n && *cur_rec == *rec3) {
+ } else if (shard3_idx < n && cur_rec->rec == rec3->rec) {
++shard3_idx;
} else {
assert(false);
@@ -123,8 +123,8 @@ START_TEST(t_get_lower_bound_index)
START_TEST(t_full_cancelation)
{
size_t n = 100;
- auto buffer = create_double_seq_mbuffer<WrappedWRec>(n, false);
- auto buffer_ts = create_double_seq_mbuffer<WrappedWRec>(n, true);
+ auto buffer = create_double_seq_mbuffer<WRec>(n, false);
+ auto buffer_ts = create_double_seq_mbuffer<WRec>(n, true);
Shard* shard = new Shard(buffer);
Shard* shard_ts = new Shard(buffer_ts);