summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-05-29 14:30:08 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-05-29 14:30:08 -0400
commite920fa57cf9c503e560055864e4de37912b239e1 (patch)
treed0daba733c7af2b0b22f29d39de8f824ffa83472 /include
parentb00682429988f17152e7573ffeffa1cecfdd3d3a (diff)
downloaddynamic-extension-e920fa57cf9c503e560055864e4de37912b239e1.tar.gz
Adjusted the way that Wrapping records works to clean up interfaces
Diffstat (limited to 'include')
-rw-r--r--include/framework/DynamicExtension.h51
-rw-r--r--include/framework/InternalLevel.h10
-rw-r--r--include/framework/MutableBuffer.h20
-rw-r--r--include/framework/RecordInterface.h48
-rw-r--r--include/shard/WIRS.h90
5 files changed, 109 insertions, 110 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 68f85e2..422ca10 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -75,9 +75,9 @@ typedef ssize_t level_index;
template <RecordInterface R, ShardInterface S, QueryInterface Q, typename FQ=void>
class DynamicExtension {
- //typedef typename S<WrappedRecord<R>> Shard;
+ //typedef typename S<R> Shard;
typedef S Shard;
- typedef MutableBuffer<WrappedRecord<R>> Buffer;
+ typedef MutableBuffer<R> Buffer;
public:
DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop)
@@ -142,18 +142,23 @@ public:
level->get_query_states(shards, states, parms);
}
- std::vector<std::vector<R>> query_results(shards.size() + 1);
+ std::vector<std::vector<R>*> query_results(shards.size() + 1, nullptr);
// Execute the query for the buffer
- query_results[0] = Q::buffer_query(buffer, buffer_state, parms);
+ query_results[0] = filter_deletes(Q::buffer_query(buffer, buffer_state, parms), {-1, -1}, buffer);
// Execute the query for each shard
for (size_t i=0; i<shards.size(); i++) {
- query_results[i] = filter_deletes(Q::query(shards[i].second, states[i], parms));
+ query_results[i] = filter_deletes(Q::query(shards[i].second, states[i], parms), shards[i].first, buffer);
}
// Merge the results together
- auto result = Q::merge(query_results);
+ auto result = Q::merge(&query_results);
+
+ for (size_t i=0; i<query_results.size(); i++) {
+ delete query_results[i];
+ Q::delete_query_state(states[i]);
+ }
return result;
}
@@ -259,7 +264,7 @@ private:
size_t m_scale_factor;
double m_max_delete_prop;
- std::vector<InternalLevel<WrappedRecord<R>, S, Q> *> m_levels;
+ std::vector<InternalLevel<R, S, Q> *> m_levels;
Buffer *get_buffer() {
return m_buffer;
@@ -274,14 +279,12 @@ private:
merge_buffer();
}
- WrappedRecord<R> wrec = static_cast<WrappedRecord<R>>(rec);
-
- return buffer->append(wrec, ts);
+ return buffer->append(rec, ts);
}
- std::vector<R> filter_deletes(std::vector<R> records, ShardID shid, Buffer *buffer) {
- std::vector<R> processed_records;
- processed_records.reserve(records.size());
+ std::vector<R> *filter_deletes(std::vector<R> *records, ShardID shid, Buffer *buffer) {
+ std::vector<R> *processed_records = new std::vector<R>();
+ processed_records->reserve(records->size());
// For delete tagging, we just need to check the delete bit on each
// record.
@@ -291,9 +294,10 @@ private:
continue;
}
- processed_records.emplace_back(rec.rec);
+ processed_records->emplace_back(static_cast<R>(rec.rec));
}
+ delete records;
return processed_records;
}
@@ -320,8 +324,10 @@ private:
}
}
- processed_records.emplace_back(rec.rec);
+ processed_records->emplace_back(static_cast<R>(rec.rec));
}
+
+ delete records;
}
/*
@@ -337,7 +343,7 @@ private:
if (new_idx > 0) {
assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
}
- m_levels.emplace_back(new InternalLevel<WrappedRecord<R>, Shard, Q>(new_idx, new_shard_cnt));
+ m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt));
return new_idx;
}
@@ -416,14 +422,14 @@ private:
// merging two memory levels
if (LSM_LEVELING) {
auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
+ m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
mark_as_unused(tmp);
} else {
m_levels[base_level]->append_merged_shards(m_levels[incoming_level]);
}
mark_as_unused(m_levels[incoming_level]);
- m_levels[incoming_level] = new InternalLevel<WrappedRecord<R>, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
+ m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
}
@@ -432,9 +438,9 @@ private:
if (LSM_LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0];
- auto temp_level = new InternalLevel<WrappedRecord<R>, Shard, Q>(0, 1);
+ auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
temp_level->append_buffer(buffer);
- auto new_level = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(old_level, temp_level);
+ auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level);
m_levels[0] = new_level;
delete temp_level;
@@ -450,7 +456,7 @@ private:
* level may not be able to immediately be deleted, depending upon who
* else is using it.
*/
- inline void mark_as_unused(InternalLevel<WrappedRecord<R>, Shard, Q> *level) {
+ inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) {
delete level;
}
@@ -460,6 +466,9 @@ private:
* levels below idx are below the limit.
*/
inline void enforce_delete_maximum(level_index idx) {
+ // FIXME: currently broken due to tombstone cancellation issues
+ return;
+
long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx);
if (ts_prop > (long double) m_max_delete_prop) {
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index c489063..d28ba5f 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -28,18 +28,18 @@ class InternalLevel {
typedef MutableBuffer<R> Buffer;
public:
InternalLevel(ssize_t level_no, size_t shard_cap)
- : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<Shard*>(shard_cap, nullptr))
+ : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<Shard*>(shard_cap, nullptr), free_shards)
{}
// Create a new memory level sharing the shards and repurposing it as previous level_no + 1
// WARNING: for leveling only.
InternalLevel(InternalLevel* level)
: m_level_no(level->m_level_no + 1), m_shard_cnt(level->m_shard_cnt)
- , m_shards(level->m_shards) {
+ , m_shards(level->m_shards, free_shards) {
assert(m_shard_cnt == 1 && m_shards->size() == 1);
}
- ~InternalLevel() {}
+ ~InternalLevel() { }
// WARNING: for leveling only.
// assuming the base level is the level new level is merging into. (base_level is larger.)
@@ -181,6 +181,10 @@ private:
size_t m_shard_cnt;
size_t m_shard_size_cap;
+ static void free_shards(std::vector<Shard*>* vec) {
+ for (size_t i=0; i<vec->size(); i++) delete (*vec)[i];
+ }
+
std::shared_ptr<std::vector<Shard*>> m_shards;
};
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h
index c154001..bc80922 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/MutableBuffer.h
@@ -28,14 +28,13 @@ namespace de {
template <RecordInterface R>
class MutableBuffer {
- //typedef WrappedRecord<R> R;
public:
MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap)
: m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
, m_tombstonecnt(0), m_weight(0), m_max_weight(0) {
- auto len = capacity * sizeof(R);
+ auto len = capacity * sizeof(Wrapped<R>);
size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE));
- m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
+ m_data = (Wrapped<R>*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
m_tombstone_filter = nullptr;
if (max_tombstone_cap > 0) {
m_tombstone_filter = new BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS);
@@ -54,10 +53,11 @@ public:
int32_t pos = 0;
if ((pos = try_advance_tail()) == -1) return 0;
- R new_rec = rec;
- if (tombstone) new_rec.set_tombstone();
+ Wrapped<R> wrec;
+ wrec.rec = rec;
+ if (tombstone) wrec.set_tombstone();
- m_data[pos] = new_rec;
+ m_data[pos] = wrec;
m_data[pos].header |= (pos << 2);
if (tombstone) {
@@ -66,7 +66,7 @@ public:
}
if constexpr (WeightedRecordInterface<R_>) {
- m_weight.fetch_add(new_rec.weight);
+ m_weight.fetch_add(rec.weight);
double old = m_max_weight.load();
while (old < rec.weight) {
m_max_weight.compare_exchange_strong(old, rec.weight);
@@ -123,7 +123,7 @@ public:
auto offset = 0;
while (offset < m_reccnt.load()) {
- if (m_data[offset] == rec && m_data[offset].is_tombstone()) {
+ if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) {
return true;
}
offset++;;
@@ -147,7 +147,7 @@ public:
return m_weight.load();
}
- R *get_data() {
+ Wrapped<R> *get_data() {
return m_data;
}
@@ -162,7 +162,7 @@ private:
size_t m_cap;
size_t m_tombstone_cap;
- R* m_data;
+ Wrapped<R>* m_data;
BloomFilter<R>* m_tombstone_filter;
alignas(64) std::atomic<size_t> m_tombstonecnt;
diff --git a/include/framework/RecordInterface.h b/include/framework/RecordInterface.h
index c8e622f..70c8e01 100644
--- a/include/framework/RecordInterface.h
+++ b/include/framework/RecordInterface.h
@@ -26,9 +26,9 @@ concept RecordInterface = requires(R r, R s) {
};
template<RecordInterface R>
-struct WrappedRecord : R {
- //R rec;
+struct Wrapped {
uint32_t header;
+ R rec;
inline void set_delete() {
header |= 2;
@@ -49,8 +49,14 @@ struct WrappedRecord : R {
inline bool is_tombstone() const {
return header & 1;
}
+
+ inline bool operator<(const Wrapped& other) const {
+ return (rec.key < other.rec.key) || (rec.key == other.rec.key && rec.value < other.rec.value)
+ || (rec.key == other.rec.key && rec.value == other.rec.value && header < other.header);
+ }
};
+
template <typename R>
concept WeightedRecordInterface = RecordInterface<R> && requires(R r) {
{r.weight} -> std::convertible_to<double>;
@@ -76,46 +82,14 @@ struct WeightedRecord {
K key;
V value;
W weight = 1;
- uint32_t header = 0;
- inline void set_delete() {
- header |= 2;
- }
-
- inline bool is_deleted() const {
- return header & 2;
- }
-
- inline void set_tombstone(bool val=true) {
- if (val) {
- header |= val;
- } else {
- header &= 0;
- }
- }
-
- inline bool is_tombstone() const {
- return header & 1;
- }
-
- inline int match(const WeightedRecord* other) const {
- return key == other->key && value == other->value;
+ inline bool operator==(const WeightedRecord& other) const {
+ return key == other.key && value == other.value;
}
- inline bool operator<(const WeightedRecord& other) const {
+ inline bool operator<(const WeightedRecord& other) const {
return key < other.key || (key == other.key && value < other.value);
}
-
- inline bool operator==(const WeightedRecord& other) const {
- return key == other.key && value == other.value;
- }
};
-
-template <RecordInterface R>
-static bool memtable_record_cmp(const R& a, const R& b) {
- return (a.key < b.key) || (a.key == b.key && a.value < b.value)
- || (a.key == b.key && a.value == b.value && a.header < b.header);
-}
-
}
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h
index 5b610c7..9e4d911 100644
--- a/include/shard/WIRS.h
+++ b/include/shard/WIRS.h
@@ -34,6 +34,7 @@ template <WeightedRecordInterface R>
struct wirs_query_parms {
decltype(R::key) lower_bound;
decltype(R::key) upper_bound;
+ size_t sample_size;
};
template <WeightedRecordInterface R>
@@ -73,9 +74,9 @@ public:
WIRS(MutableBuffer<R>* buffer)
: m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) {
- size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE);
+ size_t alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
m_bf = new BloomFilter<K>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS);
@@ -84,11 +85,11 @@ public:
auto base = buffer->get_data();
auto stop = base + buffer->get_record_count();
- std::sort(base, stop, memtable_record_cmp<R>);
+ std::sort(base, stop, std::less<Wrapped<R>>());
while (base < stop) {
if (!(base->is_tombstone()) && (base + 1) < stop) {
- if (*base == *(base + 1) && (base + 1)->is_tombstone()) {
+ if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
base += 2;
wirs_cancelations++;
continue;
@@ -100,11 +101,11 @@ public:
base->header &= 1;
m_data[m_reccnt++] = *base;
- m_total_weight+= base->weight;
+ m_total_weight+= base->rec.weight;
if (m_bf && base->is_tombstone()) {
m_tombstone_cnt++;
- m_bf->insert(base->key);
+ m_bf->insert(base->rec.key);
}
base++;
@@ -117,56 +118,56 @@ public:
WIRS(WIRS** shards, size_t len)
: m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) {
- std::vector<Cursor<R>> cursors;
+ std::vector<Cursor<Wrapped<R>>> cursors;
cursors.reserve(len);
- PriorityQueue<R> pq(len);
+ PriorityQueue<Wrapped<R>> pq(len);
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
for (size_t i = 0; i < len; ++i) {
if (shards[i]) {
- auto base = shards[i]->sorted_output();
+ auto base = shards[i]->get_data();
cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
attemp_reccnt += shards[i]->get_record_count();
tombstone_count += shards[i]->get_tombstone_count();
pq.push(cursors[i].ptr, i);
} else {
- cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0});
+ cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
}
}
m_bf = new BloomFilter<K>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
- size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE);
+ size_t alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
while (pq.size()) {
auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<R>{nullptr, 0};
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
if (!now.data->is_tombstone() && next.data != nullptr &&
- *now.data == *next.data && next.data->is_tombstone()) {
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
pq.pop(); pq.pop();
auto& cursor1 = cursors[now.version];
auto& cursor2 = cursors[next.version];
- if (advance_cursor<R>(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor<R>(cursor2)) pq.push(cursor2.ptr, next.version);
+ if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version);
+ if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version);
} else {
auto& cursor = cursors[now.version];
if (!cursor.ptr->is_deleted()) {
m_data[m_reccnt++] = *cursor.ptr;
- m_total_weight += cursor.ptr->weight;
+ m_total_weight += cursor.ptr->rec.weight;
if (m_bf && cursor.ptr->is_tombstone()) {
++m_tombstone_cnt;
- if (m_bf) m_bf->insert(cursor.ptr->key);
+ if (m_bf) m_bf->insert(cursor.ptr->rec.key);
}
}
pq.pop();
- if (advance_cursor<R>(cursor)) pq.push(cursor.ptr, now.version);
+ if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version);
}
}
@@ -186,7 +187,7 @@ public:
free_tree(m_root);
}
- R *point_lookup(R &rec, bool filter=false) {
+ Wrapped<R> *point_lookup(R &rec, bool filter=false) {
if (filter && !m_bf->lookup(rec.key)) {
return nullptr;
}
@@ -205,7 +206,7 @@ public:
return nullptr;
}
- R* sorted_output() const {
+ Wrapped<R>* get_data() const {
return m_data;
}
@@ -217,7 +218,7 @@ public:
return m_tombstone_cnt;
}
- const R* get_record_at(size_t idx) const {
+ const Wrapped<R>* get_record_at(size_t idx) const {
if (idx >= m_reccnt) return nullptr;
return m_data + idx;
}
@@ -273,7 +274,7 @@ private:
double group_weight = 0.0;
group_norm_weight.clear();
for (size_t k = 0; k < m_group_size && i < m_reccnt; ++k, ++i) {
- auto w = m_data[i].weight;
+ auto w = m_data[i].rec.weight;
group_norm_weight.emplace_back(w);
group_weight += w;
sum_weight += w;
@@ -325,7 +326,7 @@ private:
}
}
- R* m_data;
+ Wrapped<R>* m_data;
std::vector<Alias *> m_alias;
wirs_node<R>* m_root;
W m_total_weight;
@@ -339,10 +340,10 @@ private:
template <WeightedRecordInterface R>
class WIRSQuery {
public:
- static void *get_query_state(wirs_query_parms<R> *parameters, WIRS<R> *wirs) {
+ static void *get_query_state(WIRS<R> *wirs, void *parms) {
auto res = new WIRSState<R>();
- decltype(R::key) lower_key = ((wirs_query_parms<R> *) parameters)->lower_bound;
- decltype(R::key) upper_key = ((wirs_query_parms<R> *) parameters)->upper_bound;
+ decltype(R::key) lower_key = ((wirs_query_parms<R> *) parms)->lower_bound;
+ decltype(R::key) upper_key = ((wirs_query_parms<R> *) parms)->upper_bound;
// Simulate a stack to unfold recursion.
double tot_weight = 0.0;
@@ -351,13 +352,13 @@ public:
size_t top = 1;
while(top > 0) {
auto now = st[--top];
- if (covered_by(now, lower_key, upper_key) ||
- (now->left == nullptr && now->right == nullptr && intersects(now, lower_key, upper_key))) {
+ if (wirs->covered_by(now, lower_key, upper_key) ||
+ (now->left == nullptr && now->right == nullptr && wirs->intersects(now, lower_key, upper_key))) {
res->nodes.emplace_back(now);
tot_weight += now->weight;
} else {
- if (now->left && intersects(now->left, lower_key, upper_key)) st[top++] = now->left;
- if (now->right && intersects(now->right, lower_key, upper_key)) st[top++] = now->right;
+ if (now->left && wirs->intersects(now->left, lower_key, upper_key)) st[top++] = now->left;
+ if (now->right && wirs->intersects(now->right, lower_key, upper_key)) st[top++] = now->right;
}
}
@@ -371,13 +372,19 @@ public:
return res;
}
- static std::vector<R> *query(wirs_query_parms<R> *parameters, WIRSState<R> *state, WIRS<R> *wirs) {
- auto sample_sz = parameters->sample_size;
- auto lower_key = parameters->lower_bound;
- auto upper_key = parameters->upper_bound;
- auto rng = parameters->rng;
+ static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) {
+ return nullptr;
+ }
+
+ static std::vector<Wrapped<R>> *query(WIRS<R> *wirs, void *q_state, void *parms) {
+ auto sample_sz = ((wirs_query_parms<R> *) parms)->sample_size;
+ auto lower_key = ((wirs_query_parms<R> *) parms)->lower_bound;
+ auto upper_key = ((wirs_query_parms<R> *) parms)->upper_bound;
+ auto rng = ((wirs_query_parms<R> *) parms)->rng;
+
+ auto state = (WIRSState<R> *) q_state;
- std::vector<R> *result_set = new std::vector<R>();
+ std::vector<Wrapped<R>> *result_set = new std::vector<Wrapped<R>>();
if (sample_sz == 0) {
return 0;
@@ -407,14 +414,19 @@ public:
return result_set;
}
- static std::vector<R> *merge(std::vector<std::vector<R>> *results) {
- std::vector<R> *output = new std::vector<R>();
+ static std::vector<Wrapped<R>> *buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) {
+ return new std::vector<Wrapped<R>>();
+ }
+
+ static std::vector<R> merge(std::vector<std::vector<R>*> *results) {
+ std::vector<R> output;
for (size_t i=0; i<results->size(); i++) {
for (size_t j=0; j<(*results)[i]->size(); j++) {
output->emplace_back(*((*results)[i])[j]);
}
}
+
return output;
}