summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-05-29 14:30:08 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-05-29 14:30:08 -0400
commite920fa57cf9c503e560055864e4de37912b239e1 (patch)
treed0daba733c7af2b0b22f29d39de8f824ffa83472 /include/framework/DynamicExtension.h
parentb00682429988f17152e7573ffeffa1cecfdd3d3a (diff)
downloaddynamic-extension-e920fa57cf9c503e560055864e4de37912b239e1.tar.gz
Adjusted the way that Wrapping records works to clean up interfaces
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h51
1 files changed, 30 insertions, 21 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 68f85e2..422ca10 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -75,9 +75,9 @@ typedef ssize_t level_index;
template <RecordInterface R, ShardInterface S, QueryInterface Q, typename FQ=void>
class DynamicExtension {
- //typedef typename S<WrappedRecord<R>> Shard;
+ //typedef typename S<R> Shard;
typedef S Shard;
- typedef MutableBuffer<WrappedRecord<R>> Buffer;
+ typedef MutableBuffer<R> Buffer;
public:
DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop)
@@ -142,18 +142,23 @@ public:
level->get_query_states(shards, states, parms);
}
- std::vector<std::vector<R>> query_results(shards.size() + 1);
+ std::vector<std::vector<R>*> query_results(shards.size() + 1, nullptr);
// Execute the query for the buffer
- query_results[0] = Q::buffer_query(buffer, buffer_state, parms);
+ query_results[0] = filter_deletes(Q::buffer_query(buffer, buffer_state, parms), {-1, -1}, buffer);
// Execute the query for each shard
for (size_t i=0; i<shards.size(); i++) {
- query_results[i] = filter_deletes(Q::query(shards[i].second, states[i], parms));
+ query_results[i] = filter_deletes(Q::query(shards[i].second, states[i], parms), shards[i].first, buffer);
}
// Merge the results together
- auto result = Q::merge(query_results);
+ auto result = Q::merge(&query_results);
+
+ for (size_t i=0; i<query_results.size(); i++) {
+ delete query_results[i];
+ Q::delete_query_state(states[i]);
+ }
return result;
}
@@ -259,7 +264,7 @@ private:
size_t m_scale_factor;
double m_max_delete_prop;
- std::vector<InternalLevel<WrappedRecord<R>, S, Q> *> m_levels;
+ std::vector<InternalLevel<R, S, Q> *> m_levels;
Buffer *get_buffer() {
return m_buffer;
@@ -274,14 +279,12 @@ private:
merge_buffer();
}
- WrappedRecord<R> wrec = static_cast<WrappedRecord<R>>(rec);
-
- return buffer->append(wrec, ts);
+ return buffer->append(rec, ts);
}
- std::vector<R> filter_deletes(std::vector<R> records, ShardID shid, Buffer *buffer) {
- std::vector<R> processed_records;
- processed_records.reserve(records.size());
+ std::vector<R> *filter_deletes(std::vector<R> *records, ShardID shid, Buffer *buffer) {
+ std::vector<R> *processed_records = new std::vector<R>();
+ processed_records->reserve(records->size());
// For delete tagging, we just need to check the delete bit on each
// record.
@@ -291,9 +294,10 @@ private:
continue;
}
- processed_records.emplace_back(rec.rec);
+ processed_records->emplace_back(static_cast<R>(rec.rec));
}
+ delete records;
return processed_records;
}
@@ -320,8 +324,10 @@ private:
}
}
- processed_records.emplace_back(rec.rec);
+ processed_records->emplace_back(static_cast<R>(rec.rec));
}
+
+ delete records;
}
/*
@@ -337,7 +343,7 @@ private:
if (new_idx > 0) {
assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
}
- m_levels.emplace_back(new InternalLevel<WrappedRecord<R>, Shard, Q>(new_idx, new_shard_cnt));
+ m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt));
return new_idx;
}
@@ -416,14 +422,14 @@ private:
// merging two memory levels
if (LSM_LEVELING) {
auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
+ m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
mark_as_unused(tmp);
} else {
m_levels[base_level]->append_merged_shards(m_levels[incoming_level]);
}
mark_as_unused(m_levels[incoming_level]);
- m_levels[incoming_level] = new InternalLevel<WrappedRecord<R>, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
+ m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
}
@@ -432,9 +438,9 @@ private:
if (LSM_LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0];
- auto temp_level = new InternalLevel<WrappedRecord<R>, Shard, Q>(0, 1);
+ auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
temp_level->append_buffer(buffer);
- auto new_level = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(old_level, temp_level);
+ auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level);
m_levels[0] = new_level;
delete temp_level;
@@ -450,7 +456,7 @@ private:
* level may not be able to immediately be deleted, depending upon who
* else is using it.
*/
- inline void mark_as_unused(InternalLevel<WrappedRecord<R>, Shard, Q> *level) {
+ inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) {
delete level;
}
@@ -460,6 +466,9 @@ private:
* levels below idx are below the limit.
*/
inline void enforce_delete_maximum(level_index idx) {
+ // FIXME: currently broken due to tombstone cancellation issues
+ return;
+
long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx);
if (ts_prop > (long double) m_max_delete_prop) {