summaryrefslogtreecommitdiffstats
path: root/include/framework/ExtensionStructure.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/ExtensionStructure.h')
-rw-r--r--include/framework/ExtensionStructure.h101
1 files changed, 80 insertions, 21 deletions
diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h
index 1e756db..9a5f6b3 100644
--- a/include/framework/ExtensionStructure.h
+++ b/include/framework/ExtensionStructure.h
@@ -27,6 +27,17 @@
namespace de {
+struct MergeTask {
+ level_index m_source_level;
+ level_index m_target_level;
+ size_t m_size;
+ size_t m_timestamp;
+
+ bool operator<(MergeTask &other) {
+ return m_timestamp < other.m_timestamp;
+ }
+};
+
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING>
class ExtensionStructure {
typedef S Shard;
@@ -90,9 +101,7 @@ public:
* merges to make room for it.
*/
inline bool merge_buffer(Buffer *buffer) {
- if (!can_merge_with(0, buffer->get_record_count())) {
- merge_down(0);
- }
+ assert(can_merge_with(0, buffer->get_record_count()));
merge_buffer_into_l0(buffer);
enforce_delete_maximum(0);
@@ -192,6 +201,74 @@ public:
return m_levels;
}
+ /*
+ *
+ */
+ std::vector<MergeTask> get_merge_tasks(size_t buffer_reccnt) {
+ std::vector<MergeTask> merges;
+
+ /*
+ * The buffer -> L0 merge task is not included so if that
+ * can be done without any other change, just return an
+ * empty list.
+ */
+ if (can_merge_with(0, buffer_reccnt)) {
+ return std::move(merges);
+ }
+
+ level_index merge_base_level = find_mergable_level(0);
+ if (merge_base_level == -1) {
+ merge_base_level = grow();
+ }
+
+ for (level_index i=merge_base_level; i>0; i--) {
+ MergeTask task;
+ task.m_source_level = i - 1;
+ task.m_target_level = i;
+
+ /*
+ * The amount of storage required for the merge accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i-1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_merge_with(i, reccnt)) {
+ reccnt += m_levels[i]->get_record_count();
+ }
+ }
+ task.m_size = 2* reccnt * sizeof(R);
+
+ merges.push_back(task);
+ }
+
+ return std::move(merges);
+ }
+
+ /*
+ * Merge the level specified by incoming level into the level specified
+ * by base level. The two levels should be sequential--i.e. no levels
+ * are skipped in the merge process--otherwise the tombstone ordering
+ * invariant may be violated by the merge operation.
+ */
+ inline void merge_levels(level_index base_level, level_index incoming_level) {
+ // merging two memory levels
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ auto tmp = m_levels[base_level];
+ m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get());
+ } else {
+ m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get());
+ }
+
+ m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
+ }
+
+
private:
size_t m_scale_factor;
double m_max_delete_prop;
@@ -265,24 +342,6 @@ private:
return -1;
}
- /*
- * Merge the level specified by incoming level into the level specified
- * by base level. The two levels should be sequential--i.e. no levels
- * are skipped in the merge process--otherwise the tombstone ordering
- * invariant may be violated by the merge operation.
- */
- inline void merge_levels(level_index base_level, level_index incoming_level) {
- // merging two memory levels
- if constexpr (L == LayoutPolicy::LEVELING) {
- auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get());
- } else {
- m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get());
- }
-
- m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
- }
-
inline void merge_buffer_into_l0(Buffer *buffer) {
assert(m_levels[0]);