summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-05-29 12:33:58 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-05-29 12:33:58 -0400
commitb00682429988f17152e7573ffeffa1cecfdd3d3a (patch)
treeb621adaa2bfe3a7a9846970f7426fd950e17aa99 /include/framework
parentde5fa133758e2f0aad855ac58dff5cfa13d06f74 (diff)
downloaddynamic-extension-b00682429988f17152e7573ffeffa1cecfdd3d3a.tar.gz
Tests and bugfixes for framework
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h30
-rw-r--r--include/framework/InternalLevel.h54
-rw-r--r--include/framework/MutableBuffer.h22
-rw-r--r--include/framework/QueryInterface.h5
-rw-r--r--include/framework/RecordInterface.h4
5 files changed, 62 insertions, 53 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index a70dda4..68f85e2 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -77,7 +77,7 @@ template <RecordInterface R, ShardInterface S, QueryInterface Q, typename FQ=voi
class DynamicExtension {
//typedef typename S<WrappedRecord<R>> Shard;
typedef S Shard;
- typedef MutableBuffer<R> Buffer;
+ typedef MutableBuffer<WrappedRecord<R>> Buffer;
public:
DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop)
@@ -149,7 +149,7 @@ public:
// Execute the query for each shard
for (size_t i=0; i<shards.size(); i++) {
- query_results[i] = post_process(Q::query(shards[i].second, states[i], parms));
+ query_results[i] = filter_deletes(Q::query(shards[i].second, states[i], parms));
}
// Merge the results together
@@ -233,7 +233,7 @@ public:
}
}
- shards.emplace_back(new S(get_buffer(), nullptr));
+ shards.emplace_back(new S(get_buffer()));
Shard *shards_array[shards.size()];
@@ -244,7 +244,7 @@ public:
}
}
- Shard *flattened = new S(shards_array, j, nullptr);
+ Shard *flattened = new S(shards_array, j);
for (auto shard : shards) {
delete shard;
@@ -259,13 +259,13 @@ private:
size_t m_scale_factor;
double m_max_delete_prop;
- std::vector<InternalLevel<R, S, Q> *> m_levels;
+ std::vector<InternalLevel<WrappedRecord<R>, S, Q> *> m_levels;
Buffer *get_buffer() {
return m_buffer;
}
- int internal_append(R &rec, bool ts) {
+ int internal_append(const R &rec, bool ts) {
Buffer *buffer;
while (!(buffer = get_buffer()))
;
@@ -274,10 +274,12 @@ private:
merge_buffer();
}
- return buffer->append(rec, ts);
+ WrappedRecord<R> wrec = static_cast<WrappedRecord<R>>(rec);
+
+ return buffer->append(wrec, ts);
}
- std::vector<R> post_process(std::vector<R> records, ShardID shid, Buffer *buffer) {
+ std::vector<R> filter_deletes(std::vector<R> records, ShardID shid, Buffer *buffer) {
std::vector<R> processed_records;
processed_records.reserve(records.size());
@@ -335,7 +337,7 @@ private:
if (new_idx > 0) {
assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
}
- m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt));
+ m_levels.emplace_back(new InternalLevel<WrappedRecord<R>, Shard, Q>(new_idx, new_shard_cnt));
return new_idx;
}
@@ -414,14 +416,14 @@ private:
// merging two memory levels
if (LSM_LEVELING) {
auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
+ m_levels[base_level] = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
mark_as_unused(tmp);
} else {
m_levels[base_level]->append_merged_shards(m_levels[incoming_level]);
}
mark_as_unused(m_levels[incoming_level]);
- m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
+ m_levels[incoming_level] = new InternalLevel<WrappedRecord<R>, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
}
@@ -430,9 +432,9 @@ private:
if (LSM_LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0];
- auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
+ auto temp_level = new InternalLevel<WrappedRecord<R>, Shard, Q>(0, 1);
temp_level->append_buffer(buffer);
- auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level);
+ auto new_level = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(old_level, temp_level);
m_levels[0] = new_level;
delete temp_level;
@@ -448,7 +450,7 @@ private:
* level may not be able to immediately be deleted, depending upon who
* else is using it.
*/
- inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) {
+ inline void mark_as_unused(InternalLevel<WrappedRecord<R>, Shard, Q> *level) {
delete level;
}
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index 6986a61..c489063 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -28,7 +28,7 @@ class InternalLevel {
typedef MutableBuffer<R> Buffer;
public:
InternalLevel(ssize_t level_no, size_t shard_cap)
- : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<Shard>(shard_cap, nullptr))
+ : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<Shard*>(shard_cap, nullptr))
{}
// Create a new memory level sharing the shards and repurposing it as previous level_no + 1
@@ -36,7 +36,7 @@ public:
InternalLevel(InternalLevel* level)
: m_level_no(level->m_level_no + 1), m_shard_cnt(level->m_shard_cnt)
, m_shards(level->m_shards) {
- assert(m_shard_cnt == 1 && m_shards.size() == 1);
+ assert(m_shard_cnt == 1 && m_shards->size() == 1);
}
~InternalLevel() {}
@@ -48,22 +48,22 @@ public:
auto res = new InternalLevel(base_level->m_level_no, 1);
res->m_shard_cnt = 1;
Shard* shards[2];
- shards[0] = base_level->m_shards[0];
- shards[1] = new_level->m_shards[0];
+ shards[0] = (*base_level->m_shards)[0];
+ shards[1] = (*new_level->m_shards)[0];
- res->m_shards[0] = new S(shards, 2);
+ (*res->m_shards)[0] = new S(shards, 2);
return res;
}
void append_buffer(Buffer* buffer) {
- assert(m_shard_cnt < m_shards.size());
- m_shards[m_shard_cnt] = new S(buffer);
+ assert(m_shard_cnt < m_shards->size());
+ (*m_shards)[m_shard_cnt] = new S(buffer);
++m_shard_cnt;
}
void append_merged_shards(InternalLevel* level) {
- assert(m_shard_cnt < m_shards.size());
- m_shards[m_shard_cnt] = new S(level->m_shards, level->m_shard_cnt);
+ assert(m_shard_cnt < m_shards->size());
+ (*m_shards)[m_shard_cnt] = new S(level->m_shards->data(), level->m_shard_cnt);
++m_shard_cnt;
}
@@ -71,7 +71,7 @@ public:
Shard *shards[m_shard_cnt];
for (size_t i=0; i<m_shard_cnt; i++) {
- shards[i] = m_shards[i];
+ shards[i] = (*m_shards)[i];
}
return new S(shards, m_shard_cnt);
@@ -80,9 +80,9 @@ public:
// Append the sample range in-order.....
void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_shards[i]) {
- auto shard_state = Q::get_query_state(m_shards[i], query_parms);
- shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i]});
+ if ((*m_shards)[i]) {
+ auto shard_state = Q::get_query_state((*m_shards)[i], query_parms);
+ shards.push_back({{m_level_no, (ssize_t) i}, (*m_shards)[i]});
shard_states.emplace_back(shard_state);
}
}
@@ -92,8 +92,8 @@ public:
if (m_shard_cnt == 0) return false;
for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) {
- if (m_shards[i]) {
- auto res = m_shards[i]->point_lookup(rec, true);
+ if ((*m_shards)[i]) {
+ auto res = (*m_shards)[i]->point_lookup(rec, true);
if (res && res->is_tombstone()) {
return true;
}
@@ -105,9 +105,9 @@ public:
bool delete_record(const R &rec) {
if (m_shard_cnt == 0) return false;
- for (size_t i = 0; i < m_shards.size(); ++i) {
- if (m_shards[i]) {
- auto res = m_shards[i]->point_lookup(rec);
+ for (size_t i = 0; i < (*m_shards)->size(); ++i) {
+ if ((*m_shards)[i]) {
+ auto res = (*m_shards)[i]->point_lookup(rec);
if (res) {
res->set_delete();
}
@@ -118,7 +118,7 @@ public:
}
Shard* get_shard(size_t idx) {
- return m_shards[idx];
+ return (*m_shards)[idx];
}
size_t get_shard_count() {
@@ -128,7 +128,7 @@ public:
size_t get_record_cnt() {
size_t cnt = 0;
for (size_t i=0; i<m_shard_cnt; i++) {
- cnt += m_shards[i]->get_record_count();
+ cnt += (*m_shards)[i]->get_record_count();
}
return cnt;
@@ -137,7 +137,7 @@ public:
size_t get_tombstone_count() {
size_t res = 0;
for (size_t i = 0; i < m_shard_cnt; ++i) {
- res += m_shards[i]->get_tombstone_count();
+ res += (*m_shards)[i]->get_tombstone_count();
}
return res;
}
@@ -145,7 +145,7 @@ public:
size_t get_aux_memory_usage() {
size_t cnt = 0;
for (size_t i=0; i<m_shard_cnt; i++) {
- cnt += m_shards[i]->get_aux_memory_usage();
+ cnt += (*m_shards)[i]->get_aux_memory_usage();
}
return cnt;
@@ -154,8 +154,8 @@ public:
size_t get_memory_usage() {
size_t cnt = 0;
for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_shards[i]) {
- cnt += m_shards[i]->get_memory_usage();
+ if ((*m_shards)[i]) {
+ cnt += (*m_shards)[i]->get_memory_usage();
}
}
@@ -166,9 +166,9 @@ public:
size_t tscnt = 0;
size_t reccnt = 0;
for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_shards[i]) {
- tscnt += m_shards[i]->get_tombstone_count();
- reccnt += m_shards[i]->get_record_count();
+ if ((*m_shards)[i]) {
+ tscnt += (*m_shards)[i]->get_tombstone_count();
+ reccnt += (*m_shards[i])->get_record_count();
}
}
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h
index 3643a89..c154001 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/MutableBuffer.h
@@ -28,14 +28,14 @@ namespace de {
template <RecordInterface R>
class MutableBuffer {
- typedef WrappedRecord<R> WRec;
+ //typedef WrappedRecord<R> R;
public:
MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap)
: m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
, m_tombstonecnt(0), m_weight(0), m_max_weight(0) {
- auto len = capacity * sizeof(WRec);
+ auto len = capacity * sizeof(R);
size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE));
- m_data = (WRec*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
+ m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
m_tombstone_filter = nullptr;
if (max_tombstone_cap > 0) {
m_tombstone_filter = new BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS);
@@ -54,12 +54,10 @@ public:
int32_t pos = 0;
if ((pos = try_advance_tail()) == -1) return 0;
- WRec wrec;
- wrec.rec = rec;
+ R new_rec = rec;
+ if (tombstone) new_rec.set_tombstone();
- if (tombstone) wrec.set_tombstone();
-
- m_data[pos] = wrec;
+ m_data[pos] = new_rec;
m_data[pos].header |= (pos << 2);
if (tombstone) {
@@ -68,7 +66,7 @@ public:
}
if constexpr (WeightedRecordInterface<R_>) {
- m_weight.fetch_add(rec.weight);
+ m_weight.fetch_add(new_rec.weight);
double old = m_max_weight.load();
while (old < rec.weight) {
m_max_weight.compare_exchange_strong(old, rec.weight);
@@ -125,7 +123,7 @@ public:
auto offset = 0;
while (offset < m_reccnt.load()) {
- if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) {
+ if (m_data[offset] == rec && m_data[offset].is_tombstone()) {
return true;
}
offset++;;
@@ -149,6 +147,10 @@ public:
return m_weight.load();
}
+ R *get_data() {
+ return m_data;
+ }
+
private:
int32_t try_advance_tail() {
size_t new_tail = m_reccnt.fetch_add(1);
diff --git a/include/framework/QueryInterface.h b/include/framework/QueryInterface.h
index eafeeb0..886bdc8 100644
--- a/include/framework/QueryInterface.h
+++ b/include/framework/QueryInterface.h
@@ -13,10 +13,15 @@
template <typename Q>
concept QueryInterface = requires(Q q, void *p) {
+
+/*
{q.get_query_state(p, p)} -> std::convertible_to<void*>;
{q.get_buffer_query_state(p, p)};
{q.query(p, p)};
{q.buffer_query(p, p)};
{q.merge()};
{q.delete_query_state(p)};
+*/
+
+ {Q::delete_query_state(std::declval<void*>())} -> std::same_as<void>;
};
diff --git a/include/framework/RecordInterface.h b/include/framework/RecordInterface.h
index 8afd90a..c8e622f 100644
--- a/include/framework/RecordInterface.h
+++ b/include/framework/RecordInterface.h
@@ -26,8 +26,8 @@ concept RecordInterface = requires(R r, R s) {
};
template<RecordInterface R>
-struct WrappedRecord {
- R rec;
+struct WrappedRecord : R {
+ //R rec;
uint32_t header;
inline void set_delete() {