From 5c229093f9af21514b17cf778dbec7ac657e31d2 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 12 Feb 2024 11:42:18 -0500 Subject: Refactored Reconstruction Tasks Added a ReconVector type to make it easier to do load balancing by shifting tasks around, and clean up a few interfaces. --- include/framework/DynamicExtension.h | 2 +- include/framework/scheduling/Task.h | 2 +- include/framework/structure/ExtensionStructure.h | 23 +++----- include/framework/util/Configuration.h | 3 -- include/util/types.h | 67 ++++++++++++++++++++++++ 5 files changed, 77 insertions(+), 20 deletions(-) diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 7ea5370..3e1ce50 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -488,7 +488,7 @@ private: Structure *vers = args->epoch->get_structure(); for (ssize_t i=0; imerges.size(); i++) { - vers->reconstruction(args->merges[i].second, args->merges[i].first); + vers->reconstruction(args->merges[i].target, args->merges[i].source); } /* diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index d5d4266..bd53090 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -26,7 +26,7 @@ namespace de { template S, QueryInterface Q, LayoutPolicy L> struct ReconstructionArgs { Epoch *epoch; - std::vector merges; + ReconstructionVector merges; std::promise result; bool compaction; void *extension; diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 823a66b..f1c7535 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -218,8 +218,8 @@ public: * major concern outside of sampling; at least for now. So I'm not * going to focus too much time on it at the moment. */ - std::vector get_compaction_tasks() { - std::vector tasks; + ReconstructionVector get_compaction_tasks() { + ReconstructionVector tasks; state_vector scratch_state = m_current_state; /* if the tombstone/delete invariant is satisfied, no need for compactions */ @@ -244,8 +244,6 @@ public: } for (level_index i=base_level; i>0; i--) { - ReconstructionTask task = {i-1, i}; - /* * The amount of storage required for the reconstruction accounts * for the cost of storing the new records, along with the @@ -262,9 +260,7 @@ public: reccnt += m_levels[i]->get_record_count(); } } - //task.m_size = 2* reccnt * sizeof(R); - - tasks.push_back(task); + tasks.add_reconstruction(i-i, i, reccnt); } return tasks; @@ -273,7 +269,7 @@ public: /* * */ - std::vector get_reconstruction_tasks(size_t buffer_reccnt, + ReconstructionVector get_reconstruction_tasks(size_t buffer_reccnt, state_vector scratch_state={}) { /* * If no scratch state vector is provided, use a copy of the @@ -289,7 +285,7 @@ public: * The buffer flush is not included so if that can be done without any * other change, just return an empty list. */ - std::vector reconstructions; + ReconstructionVector reconstructions; if (can_reconstruct_with(0, buffer_reccnt, scratch_state)) { return std::move(reconstructions); } @@ -302,8 +298,8 @@ public: /* * */ - std::vector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) { - std::vector reconstructions; + ReconstructionVector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) { + ReconstructionVector reconstructions; level_index base_level = find_reconstruction_target(source_level, scratch_state); if (base_level == -1) { @@ -311,7 +307,6 @@ public: } for (level_index i=base_level; i>source_level; i--) { - ReconstructionTask task = {i - 1, i}; /* * The amount of storage required for the reconstruction accounts * for the cost of storing the new records, along with the @@ -328,9 +323,7 @@ public: reccnt += m_levels[i]->get_record_count(); } } -// task.m_size = 2* reccnt * sizeof(R); - - reconstructions.push_back(task); + reconstructions.add_reconstruction(i-1, i, reccnt); } return std::move(reconstructions); diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 65ca181..55cc682 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -43,7 +43,4 @@ enum class DeletePolicy { TAGGING }; -typedef ssize_t level_index; -typedef std::pair ReconstructionTask; - } diff --git a/include/util/types.h b/include/util/types.h index a13bd95..e22fd18 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -19,6 +19,8 @@ #include #include +#include +#include namespace de { @@ -69,4 +71,69 @@ struct ShardID { /* A placeholder for an invalid shard--also used to indicate the mutable buffer */ const ShardID INVALID_SHID = {-1, -1}; +typedef ssize_t level_index; + +typedef struct { + level_index source; + level_index target; + size_t reccnt; +} ReconstructionTask; + +class ReconstructionVector { +public: + ReconstructionVector() + : total_reccnt(0) {} + + ~ReconstructionVector() = default; + + ReconstructionTask operator[](size_t idx) { + return m_tasks[idx]; + } + + void add_reconstruction(level_index source, level_index target, size_t reccnt) { + m_tasks.push_back({source, target, reccnt}); + total_reccnt += reccnt; + } + + ReconstructionTask remove_reconstruction(size_t idx) { + assert(idx < m_tasks.size()); + auto task = m_tasks[idx]; + + m_tasks.erase(m_tasks.begin() + idx); + total_reccnt -= task.reccnt; + + return task; + } + + ReconstructionTask remove_smallest_reconstruction() { + size_t min_size = m_tasks[0].reccnt; + size_t idx = 0; + for (size_t i=1; i m_tasks; + size_t total_reccnt; +}; + } -- cgit v1.2.3