summaryrefslogtreecommitdiffstats
path: root/include/framework/structure/ExtensionStructure.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-02-13 18:13:33 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-02-13 18:13:33 -0500
commitd28f2cfcd4249fc7d984762a326e3f2d6dcba7dc (patch)
treeea6fc7e80da2fec969c423ab50b42301397c9188 /include/framework/structure/ExtensionStructure.h
parent125e243cad99aa29444759e15053fd148ff0e3ba (diff)
downloaddynamic-extension-d28f2cfcd4249fc7d984762a326e3f2d6dcba7dc.tar.gz
progress towards resolving asynch structure merges
Diffstat (limited to 'include/framework/structure/ExtensionStructure.h')
-rw-r--r--include/framework/structure/ExtensionStructure.h132
1 files changed, 51 insertions, 81 deletions
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index fa713af..521e68b 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -55,6 +55,8 @@ public:
new_struct->m_levels.push_back(m_levels[i]->clone());
}
+ new_struct->m_deleted_shards = m_deleted_shards;
+
return new_struct;
}
@@ -159,65 +161,29 @@ public:
return cnt;
}
+
/*
* Perform the reconstruction described by task. If the resulting
* reconstruction grows the structure (i.e., adds a level), returns
* true. Otherwise, returns false.
*/
- inline bool perform_reconstruction(ReconstructionTask task, size_t version=0) {
- /* perform the reconstruction itself */
+ inline reconstruction_results<ShardType> perform_reconstruction(ReconstructionTask task) const {
+ reconstruction_results<ShardType> result;
+ result.target_level = task.target;
+
std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
- assert(shid.level_idx <= (level_index) m_levels.size());
+ assert(shid.level_idx < (level_index) m_levels.size());
assert(shid.shard_idx >= -1);
- if (shid.level_idx == (level_index) m_levels.size()) {
- continue;
- }
-
- /* if unspecified, push all shards into the vector */
- if (shid.shard_idx == all_shards_idx) {
- for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count();
- i++) {
- if (m_levels[shid.level_idx]->get_shard(i)) {
- shards.push_back(m_levels[shid.level_idx]->get_shard(i));
- }
- }
- } else {
- shards.push_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx));
- }
+ auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx);
+ shards.push_back(raw_shard_ptr);
+ result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr);
}
- auto new_shard = new ShardType(shards);
+ result.new_shard = std::make_shared<ShardType>(shards);
- /*
- * Remove all of the shards processed by the operation
- */
- for (ShardID shid : task.sources) {
- if (shid.level_idx == (level_index) m_levels.size()) {
- continue;
- } else if (shid.shard_idx == all_shards_idx) {
- m_levels[shid.level_idx]->truncate();
- } else if (shid != buffer_shid) {
- m_levels[shid.level_idx]->delete_shard(shid.shard_idx);
- }
- }
-
- // fprintf(stderr, "Target: %ld\tLevels:%ld\n", task.target, m_levels.size());
-
- /*
- * Append the new shard to the target level
- */
- if (task.target < (level_index)m_levels.size()) {
- m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard), version);
- return false;
- // fprintf(stderr, "append (no growth)\n");
- } else { /* grow the structure if needed */
- m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(task.target));
- m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard), version);
- return true;
- // fprintf(stderr, "grow and append\n");
- }
+ return result;
}
std::vector<typename QueryType::LocalQuery *>
@@ -237,8 +203,36 @@ public:
return m_levels[0]->get_shard_count();
}
- void append_l0(std::shared_ptr<ShardType> shard, size_t version) {
- m_levels[0]->append(shard, version);
+ bool append_shard(std::shared_ptr<ShardType> shard, size_t version, size_t level) {
+ assert(level <= m_levels.size());
+ auto rc = false;
+
+ if (level == m_levels.size()) {
+ /* grow the structure */
+ m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(level));
+ rc = true;
+ }
+
+ m_levels[level]->append(shard, version);
+
+ return rc;
+ }
+
+ void delete_shards(std::vector<std::pair<level_index, const ShardType*>> shards) {
+ for (size_t i=0; i<shards.size(); i++) {
+ assert(shards[i].first < (level_index) m_levels.size());
+ ssize_t shard_idx = -1;
+ for (size_t j=0; j<m_levels[shards[i].first]->get_shard_count(); j++) {
+ if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() == shards[i].second) {
+ shard_idx = j;
+ break;
+ }
+ }
+
+ if (shard_idx != -1) {
+ m_levels[shards[i].first]->delete_shard(shard_idx);
+ }
+ }
}
LevelVector const &get_level_vector() const { return m_levels; }
@@ -269,13 +263,17 @@ public:
return ts_prop <= (long double) max_delete_prop;
}
- void print_structure() const {
+ void print_structure(bool debug=false) const {
for (size_t i=0; i<m_levels.size(); i++) {
- fprintf(stdout, "[%ld]:\t", i);
+ if (debug) {
+ fprintf(stdout, "[D] [%ld]:\t", i);
+ } else {
+ fprintf(stdout, "[%ld]:\t", i);
+ }
if (m_levels[i]) {
for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) {
- fprintf(stdout, "(%ld: %ld) ", j, m_levels[i]->get_shard(j)->get_record_count());
+ fprintf(stdout, "(%ld, %ld, %p: %ld) ", j, m_levels[i]->get_shard_ptr(j).second, m_levels[i]->get_shard_ptr(j).first.get(), m_levels[i]->get_shard(j)->get_record_count());
}
} else {
fprintf(stdout, "[Empty]");
@@ -285,37 +283,9 @@ public:
}
}
-
- void merge_structure(const ExtensionStructure* old_structure, size_t version_id = 0) {
- assert(version_id > 0);
-
- for (size_t i=0; i<old_structure->m_levels.size(); i++) {
- if (m_levels.size() <= i) {
- m_levels.push_back(old_structure->m_levels[i]);
- } else {
- for (size_t j=0; j<old_structure->m_levels[i]->get_shard_count(); j++) {
- if (old_structure->m_levels[i]->get_shard_version(j) > version_id) {
- m_levels[i]->append(old_structure->m_levels[i]->get_shard_ptr(j));
- }
- }
- }
- }
- }
-
- void update_shard_version(size_t version) {
- assert(version != 0);
-
- for (size_t i=0; i<m_levels.size(); i++) {
- for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) {
- if (m_levels[i]->get_shard_version(j) == 0) {
- m_levels[i]->set_shard_version(j, version);
- }
- }
- }
- }
-
private:
LevelVector m_levels;
+ std::vector<ShardType *> m_deleted_shards;
};
} // namespace de