summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-05-29 12:33:58 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-05-29 12:33:58 -0400
commitb00682429988f17152e7573ffeffa1cecfdd3d3a (patch)
treeb621adaa2bfe3a7a9846970f7426fd950e17aa99 /include/framework/DynamicExtension.h
parentde5fa133758e2f0aad855ac58dff5cfa13d06f74 (diff)
downloaddynamic-extension-b00682429988f17152e7573ffeffa1cecfdd3d3a.tar.gz
Tests and bugfixes for framework
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h30
1 files changed, 16 insertions, 14 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index a70dda4..68f85e2 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -77,7 +77,7 @@ template <RecordInterface R, ShardInterface S, QueryInterface Q, typename FQ=voi
class DynamicExtension {
//typedef typename S<WrappedRecord<R>> Shard;
typedef S Shard;
- typedef MutableBuffer<R> Buffer;
+ typedef MutableBuffer<WrappedRecord<R>> Buffer;
public:
DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop)
@@ -149,7 +149,7 @@ public:
// Execute the query for each shard
for (size_t i=0; i<shards.size(); i++) {
- query_results[i] = post_process(Q::query(shards[i].second, states[i], parms));
+ query_results[i] = filter_deletes(Q::query(shards[i].second, states[i], parms));
}
// Merge the results together
@@ -233,7 +233,7 @@ public:
}
}
- shards.emplace_back(new S(get_buffer(), nullptr));
+ shards.emplace_back(new S(get_buffer()));
Shard *shards_array[shards.size()];
@@ -244,7 +244,7 @@ public:
}
}
- Shard *flattened = new S(shards_array, j, nullptr);
+ Shard *flattened = new S(shards_array, j);
for (auto shard : shards) {
delete shard;
@@ -259,13 +259,13 @@ private:
size_t m_scale_factor;
double m_max_delete_prop;
- std::vector<InternalLevel<R, S, Q> *> m_levels;
+ std::vector<InternalLevel<WrappedRecord<R>, S, Q> *> m_levels;
Buffer *get_buffer() {
return m_buffer;
}
- int internal_append(R &rec, bool ts) {
+ int internal_append(const R &rec, bool ts) {
Buffer *buffer;
while (!(buffer = get_buffer()))
;
@@ -274,10 +274,12 @@ private:
merge_buffer();
}
- return buffer->append(rec, ts);
+ WrappedRecord<R> wrec = static_cast<WrappedRecord<R>>(rec);
+
+ return buffer->append(wrec, ts);
}
- std::vector<R> post_process(std::vector<R> records, ShardID shid, Buffer *buffer) {
+ std::vector<R> filter_deletes(std::vector<R> records, ShardID shid, Buffer *buffer) {
std::vector<R> processed_records;
processed_records.reserve(records.size());
@@ -335,7 +337,7 @@ private:
if (new_idx > 0) {
assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
}
- m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt));
+ m_levels.emplace_back(new InternalLevel<WrappedRecord<R>, Shard, Q>(new_idx, new_shard_cnt));
return new_idx;
}
@@ -414,14 +416,14 @@ private:
// merging two memory levels
if (LSM_LEVELING) {
auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
+ m_levels[base_level] = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
mark_as_unused(tmp);
} else {
m_levels[base_level]->append_merged_shards(m_levels[incoming_level]);
}
mark_as_unused(m_levels[incoming_level]);
- m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
+ m_levels[incoming_level] = new InternalLevel<WrappedRecord<R>, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
}
@@ -430,9 +432,9 @@ private:
if (LSM_LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0];
- auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
+ auto temp_level = new InternalLevel<WrappedRecord<R>, Shard, Q>(0, 1);
temp_level->append_buffer(buffer);
- auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level);
+ auto new_level = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(old_level, temp_level);
m_levels[0] = new_level;
delete temp_level;
@@ -448,7 +450,7 @@ private:
* level may not be able to immediately be deleted, depending upon who
* else is using it.
*/
- inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) {
+ inline void mark_as_unused(InternalLevel<WrappedRecord<R>, Shard, Q> *level) {
delete level;
}