summaryrefslogtreecommitdiffstats
path: root/include/framework/structure
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/structure')
-rw-r--r--include/framework/structure/ExtensionStructure.h95
-rw-r--r--include/framework/structure/InternalLevel.h19
2 files changed, 85 insertions, 29 deletions
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 3bb8a0b..078c4a9 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -145,17 +145,32 @@ public:
return cnt;
}
- inline void perform_reconstruction(ReconstructionTask task) {
+ size_t get_shard_count() const {
+ size_t cnt = 0;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i]) {
+ cnt += m_levels[i]->get_nonempty_shard_count();
+ }
+ }
+
+ return cnt;
+ }
+
+ inline void perform_reconstruction(ReconstructionTask task,
+ BuffView *bv=nullptr) {
/* perform the reconstruction itself */
std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
assert(shid.level_idx < (level_index) m_levels.size());
assert(shid.shard_idx >= -1);
- /* if unspecified, push all shards into the vector */
- if (shid.shard_idx == all_shards_idx) {
- for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count();
- i++) {
+ if (shid == buffer_shid) {
+ assert(bv);
+ ShardType *buffer_shard = new ShardType(std::move(*bv));
+ shards.push_back(buffer_shard);
+ } else if (shid.shard_idx == all_shards_idx) {
+ /* if unspecified, push all shards into the vector */
+ for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) {
if (m_levels[shid.level_idx]->get_shard(i)) {
shards.push_back(m_levels[shid.level_idx]->get_shard(i));
}
@@ -171,7 +186,9 @@ public:
* Remove all of the shards processed by the operation
*/
for (ShardID shid : task.sources) {
- if (shid.shard_idx == all_shards_idx) {
+ if (shid == buffer_shid) {
+ continue;
+ } else if (shid.shard_idx == all_shards_idx) {
m_levels[shid.level_idx]->truncate();
} else {
m_levels[shid.level_idx]->delete_shard(shid.shard_idx);
@@ -199,29 +216,49 @@ public:
* like that, we'll leave this as low priority.
*/
- /* insert the first level, if needed */
- if (m_levels.size() == 0) {
- m_levels.push_back(
- std::make_shared<InternalLevel<ShardType, QueryType>>(0));
- }
-
- ShardType *buffer_shard = new ShardType(std::move(buffer));
- if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) {
- m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard));
- } else {
- std::vector<const ShardType *> shards;
- for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count();
- i++) {
- if (m_levels[0]->get_shard(i)) {
- shards.push_back(m_levels[0]->get_shard(i));
- }
-
- shards.push_back(buffer_shard);
- ShardType *new_shard = new ShardType(shards);
- m_levels[0]->truncate();
- m_levels[0]->append(std::shared_ptr<ShardType>(new_shard));
- }
- }
+ // /* insert the first level, if needed */
+ // if (m_levels.size() == 0) {
+ // m_levels.push_back(
+ // std::make_shared<InternalLevel<ShardType, QueryType>>(0));
+ // }
+
+ perform_reconstruction(task, &buffer);
+
+ // ShardType *buffer_shard = new ShardType(std::move(buffer));
+ // if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) {
+ // m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard));
+ // } else if (task.type == ReconstructionType::Merge) {
+ // std::vector<const ShardType *> shards;
+ // for (size_t i=0; i<task.sources.size(); i++) {
+ // ShardID shid = task.sources[i];
+ // if (shid != buffer_shid) {
+ // shards.emplace_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx));
+ // }
+ // }
+
+ // shards.emplace_back(buffer_shard);
+ // ShardType *new_shard = new ShardType(shards);
+ // m_levels[0]->append(std::shared_ptr<ShardType>(new_shard));
+ // for (size_t i=0; i<task.sources.size(); i++) {
+ // ShardID shid = task.sources[i];
+ // if (shid != buffer_shid) {
+ // m_levels[shid.level_idx]->delete_shard(shid.shard_idx);
+ // }
+ // }
+ // } else {
+ // std::vector<const ShardType *> shards;
+ // for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count();
+ // i++) {
+ // if (m_levels[0]->get_shard(i)) {
+ // shards.push_back(m_levels[0]->get_shard(i));
+ // }
+
+ // shards.push_back(buffer_shard);
+ // ShardType *new_shard = new ShardType(shards);
+ // m_levels[0]->truncate();
+ // m_levels[0]->append(std::shared_ptr<ShardType>(new_shard));
+ // }
+ // }
}
bool take_reference() {
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index c9d1749..5bc891b 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -166,6 +166,17 @@ public:
return (double)tscnt / (double)(tscnt + reccnt);
}
+ size_t get_nonempty_shard_count() const {
+ size_t cnt = 0;
+ for (size_t i = 0; i < m_shards.size(); i++) {
+ if (m_shards[i] && m_shards[i]->get_record_count() > 0) {
+ cnt += 1;
+ }
+ }
+
+ return cnt;
+ }
+
std::shared_ptr<InternalLevel> clone() const {
auto new_level = std::make_shared<InternalLevel>(m_level_no);
for (size_t i = 0; i < m_shards.size(); i++) {
@@ -185,6 +196,14 @@ public:
m_shards.emplace_back(shard);
}
+ const ShardType *get_shard(ShardID shid) const {
+ if (shid < m_shards.size()) {
+ return m_shards[shid].get();
+ }
+
+ return nullptr;
+ }
+
private:
ssize_t m_level_no;
std::vector<std::shared_ptr<ShardType>> m_shards;