summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h41
1 files changed, 23 insertions, 18 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index fd42c5f..53b55b1 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -16,6 +16,9 @@
#include "framework/MutableBuffer.h"
#include "framework/InternalLevel.h"
+#include "framework/ShardInterface.h"
+
+#include "shard/WIRS.h"
#include "ds/Alias.h"
#include "util/timer.h"
@@ -68,9 +71,8 @@ enum class DeletePolicy {
typedef ssize_t level_index;
-template <RecordInterface R>
+template <RecordInterface R, ShardInterface S>
class DynamicExtension {
- typedef WIRS<R> Shard;
typedef decltype(R::key) K;
typedef decltype(R::value) V;
typedef decltype(R::weight) W;
@@ -143,7 +145,7 @@ public:
// Get the shard weights for each level. Index 0 is the buffer,
// represented by nullptr.
- std::vector<std::pair<ShardID, Shard*>> shards;
+ std::vector<std::pair<ShardID, S*>> shards;
std::vector<void*> states;
shards.push_back({{-1, -1}, nullptr});
states.push_back(nullptr);
@@ -151,13 +153,15 @@ public:
std::vector<W> shard_weights;
shard_weights.push_back((double) buffer_weight);
+ WIRS<R>::wirs_query_parms parms = {lower_key, upper_key};
+
for (auto &level : m_levels) {
- level->get_shard_weights(shard_weights, shards, states, lower_key, upper_key);
+ level->get_query_states(shard_weights, shards, states, &parms);
}
if (shard_weights.size() == 1 && shard_weights[0] == 0) {
if (buffer_alias) delete buffer_alias;
- for (auto& x: states) Shard::delete_state(x);
+ for (auto& x: states) S::delete_query_state(x);
sampling_bailouts++;
return; // no records in the sampling range
}
@@ -243,7 +247,7 @@ public:
} while (sample_idx < sample_sz);
if (buffer_alias) delete buffer_alias;
- for (auto& x: states) Shard::delete_state(x);
+ for (auto& x: states) S::delete_query_state(x);
enforce_rejection_rate_maximum(rng);
}
@@ -348,8 +352,8 @@ public:
}
- Shard *create_ssi() {
- std::vector<Shard *> shards;
+ S *create_ssi() {
+ std::vector<S *> shards;
if (m_levels.size() > 0) {
for (int i=m_levels.size() - 1; i>= 0; i--) {
@@ -359,9 +363,9 @@ public:
}
}
- shards.emplace_back(new Shard(get_buffer(), nullptr));
+ shards.emplace_back(new S(get_buffer(), nullptr));
- Shard *shards_array[shards.size()];
+ S *shards_array[shards.size()];
size_t j = 0;
for (size_t i=0; i<shards.size(); i++) {
@@ -370,7 +374,7 @@ public:
}
}
- Shard *flattened = new Shard(shards_array, j, nullptr);
+ S *flattened = new S(shards_array, j, nullptr);
for (auto shard : shards) {
delete shard;
@@ -390,7 +394,7 @@ private:
double m_max_delete_prop;
double m_max_rejection_rate;
- std::vector<InternalLevel<R> *> m_levels;
+ std::vector<InternalLevel<R, S> *> m_levels;
level_index m_last_level_idx;
@@ -446,7 +450,7 @@ private:
if (new_idx > 0) {
assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
}
- m_levels.emplace_back(new InternalLevel<R>(new_idx, new_shard_cnt));
+ m_levels.emplace_back(new InternalLevel<R, S>(new_idx, new_shard_cnt));
m_last_level_idx++;
return new_idx;
@@ -526,14 +530,14 @@ private:
// merging two memory levels
if (LSM_LEVELING) {
auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<R>::merge_levels(m_levels[base_level], m_levels[incoming_level], rng);
+ m_levels[base_level] = InternalLevel<R, S>::merge_levels(m_levels[base_level], m_levels[incoming_level], rng);
mark_as_unused(tmp);
} else {
m_levels[base_level]->append_merged_shards(m_levels[incoming_level], rng);
}
mark_as_unused(m_levels[incoming_level]);
- m_levels[incoming_level] = new InternalLevel<R>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
+ m_levels[incoming_level] = new InternalLevel<R, S>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
}
@@ -542,9 +546,9 @@ private:
if (LSM_LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0];
- auto temp_level = new InternalLevel<R>(0, 1);
+ auto temp_level = new InternalLevel<R, S>(0, 1);
temp_level->append_mem_table(buffer, rng);
- auto new_level = InternalLevel<R>::merge_levels(old_level, temp_level, rng);
+ auto new_level = InternalLevel<R, S>::merge_levels(old_level, temp_level, rng);
m_levels[0] = new_level;
delete temp_level;
@@ -560,7 +564,7 @@ private:
* level may not be able to immediately be deleted, depending upon who
* else is using it.
*/
- inline void mark_as_unused(InternalLevel<R> *level) {
+ inline void mark_as_unused(InternalLevel<R, S> *level) {
delete level;
}
@@ -609,6 +613,7 @@ private:
* no guarantees about which buffer will be accessed if level_index is -1.
*/
inline size_t get_level_record_count(level_index idx, MutableBuffer<R> *buffer=nullptr) {
+
assert(idx >= -1);
if (idx == -1) {
return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count();