summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-02-12 11:42:18 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-02-12 11:56:42 -0500
commit5c229093f9af21514b17cf778dbec7ac657e31d2 (patch)
tree7453b6dbf26362d36a4904175cc8303f5b45563e
parent90194cb5be5c7b80bfa21a353a75920e932d54d8 (diff)
downloaddynamic-extension-5c229093f9af21514b17cf778dbec7ac657e31d2.tar.gz
Refactored Reconstruction Tasks
Added a ReconVector type to make it easier to do load balancing by shifting tasks around, and clean up a few interfaces.
-rw-r--r--include/framework/DynamicExtension.h2
-rw-r--r--include/framework/scheduling/Task.h2
-rw-r--r--include/framework/structure/ExtensionStructure.h23
-rw-r--r--include/framework/util/Configuration.h3
-rw-r--r--include/util/types.h67
5 files changed, 77 insertions, 20 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 7ea5370..3e1ce50 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -488,7 +488,7 @@ private:
Structure *vers = args->epoch->get_structure();
for (ssize_t i=0; i<args->merges.size(); i++) {
- vers->reconstruction(args->merges[i].second, args->merges[i].first);
+ vers->reconstruction(args->merges[i].target, args->merges[i].source);
}
/*
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index d5d4266..bd53090 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -26,7 +26,7 @@ namespace de {
template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L>
struct ReconstructionArgs {
Epoch<R, S, Q, L> *epoch;
- std::vector<ReconstructionTask> merges;
+ ReconstructionVector merges;
std::promise<bool> result;
bool compaction;
void *extension;
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 823a66b..f1c7535 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -218,8 +218,8 @@ public:
* major concern outside of sampling; at least for now. So I'm not
* going to focus too much time on it at the moment.
*/
- std::vector<ReconstructionTask> get_compaction_tasks() {
- std::vector<ReconstructionTask> tasks;
+ ReconstructionVector get_compaction_tasks() {
+ ReconstructionVector tasks;
state_vector scratch_state = m_current_state;
/* if the tombstone/delete invariant is satisfied, no need for compactions */
@@ -244,8 +244,6 @@ public:
}
for (level_index i=base_level; i>0; i--) {
- ReconstructionTask task = {i-1, i};
-
/*
* The amount of storage required for the reconstruction accounts
* for the cost of storing the new records, along with the
@@ -262,9 +260,7 @@ public:
reccnt += m_levels[i]->get_record_count();
}
}
- //task.m_size = 2* reccnt * sizeof(R);
-
- tasks.push_back(task);
+ tasks.add_reconstruction(i-i, i, reccnt);
}
return tasks;
@@ -273,7 +269,7 @@ public:
/*
*
*/
- std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt,
+ ReconstructionVector get_reconstruction_tasks(size_t buffer_reccnt,
state_vector scratch_state={}) {
/*
* If no scratch state vector is provided, use a copy of the
@@ -289,7 +285,7 @@ public:
* The buffer flush is not included so if that can be done without any
* other change, just return an empty list.
*/
- std::vector<ReconstructionTask> reconstructions;
+ ReconstructionVector reconstructions;
if (can_reconstruct_with(0, buffer_reccnt, scratch_state)) {
return std::move(reconstructions);
}
@@ -302,8 +298,8 @@ public:
/*
*
*/
- std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) {
- std::vector<ReconstructionTask> reconstructions;
+ ReconstructionVector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) {
+ ReconstructionVector reconstructions;
level_index base_level = find_reconstruction_target(source_level, scratch_state);
if (base_level == -1) {
@@ -311,7 +307,6 @@ public:
}
for (level_index i=base_level; i>source_level; i--) {
- ReconstructionTask task = {i - 1, i};
/*
* The amount of storage required for the reconstruction accounts
* for the cost of storing the new records, along with the
@@ -328,9 +323,7 @@ public:
reccnt += m_levels[i]->get_record_count();
}
}
-// task.m_size = 2* reccnt * sizeof(R);
-
- reconstructions.push_back(task);
+ reconstructions.add_reconstruction(i-1, i, reccnt);
}
return std::move(reconstructions);
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index 65ca181..55cc682 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -43,7 +43,4 @@ enum class DeletePolicy {
TAGGING
};
-typedef ssize_t level_index;
-typedef std::pair<level_index, level_index> ReconstructionTask;
-
}
diff --git a/include/util/types.h b/include/util/types.h
index a13bd95..e22fd18 100644
--- a/include/util/types.h
+++ b/include/util/types.h
@@ -19,6 +19,8 @@
#include <cstdint>
#include <cstdlib>
+#include <vector>
+#include <cassert>
namespace de {
@@ -69,4 +71,69 @@ struct ShardID {
/* A placeholder for an invalid shard--also used to indicate the mutable buffer */
const ShardID INVALID_SHID = {-1, -1};
+typedef ssize_t level_index;
+
+typedef struct {
+ level_index source;
+ level_index target;
+ size_t reccnt;
+} ReconstructionTask;
+
+class ReconstructionVector {
+public:
+ ReconstructionVector()
+ : total_reccnt(0) {}
+
+ ~ReconstructionVector() = default;
+
+ ReconstructionTask operator[](size_t idx) {
+ return m_tasks[idx];
+ }
+
+ void add_reconstruction(level_index source, level_index target, size_t reccnt) {
+ m_tasks.push_back({source, target, reccnt});
+ total_reccnt += reccnt;
+ }
+
+ ReconstructionTask remove_reconstruction(size_t idx) {
+ assert(idx < m_tasks.size());
+ auto task = m_tasks[idx];
+
+ m_tasks.erase(m_tasks.begin() + idx);
+ total_reccnt -= task.reccnt;
+
+ return task;
+ }
+
+ ReconstructionTask remove_smallest_reconstruction() {
+ size_t min_size = m_tasks[0].reccnt;
+ size_t idx = 0;
+ for (size_t i=1; i<m_tasks.size(); i++) {
+ if (m_tasks[i].reccnt < min_size) {
+ min_size = m_tasks[i].reccnt;
+ idx = i;
+ }
+ }
+
+ auto task = m_tasks[idx];
+ m_tasks.erase(m_tasks.begin() + idx);
+ total_reccnt -= task.reccnt;
+
+ return task;
+ }
+
+ size_t get_total_reccnt() {
+ return total_reccnt;
+ }
+
+ size_t size() {
+ return m_tasks.size();
+ }
+
+
+private:
+ std::vector<ReconstructionTask> m_tasks;
+ size_t total_reccnt;
+};
+
}