diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-02-04 17:47:52 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-02-04 17:47:52 -0500 |
| commit | d054b21a66318e096a809c9f94bc8659acfacfd8 (patch) | |
| tree | 62bc7f9e41dae257a7613e8c7e9bb080d0815dd0 /include/framework | |
| parent | 7370b4d1412da47f70c54107adf498fa6be2cfc4 (diff) | |
| download | dynamic-extension-d054b21a66318e096a809c9f94bc8659acfacfd8.tar.gz | |
Fixed a bug with leveling layout policy
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 9 | ||||
| -rw-r--r-- | include/framework/reconstruction/LevelingPolicy.h | 2 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 22 |
3 files changed, 31 insertions, 2 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 5a64243..1327559 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -296,6 +296,14 @@ public: return t; } + + + void print_structure() { + auto epoch = get_active_epoch(); + epoch->get_structure()->print_structure(); + end_job(epoch); + } + /** * Create a new single Shard object containing all of the records * within the framework (buffer and shards). @@ -707,6 +715,7 @@ private: return m_buffer->append(rec, ts); } + //#ifdef _GNU_SOURCE #if 0 void SetThreadAffinity() { diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h index add28ba..89aa94b 100644 --- a/include/framework/reconstruction/LevelingPolicy.h +++ b/include/framework/reconstruction/LevelingPolicy.h @@ -54,7 +54,7 @@ public: ReconstructionTask get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override { return ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge}; + {{buffer_shid}, {0, all_shards_idx}}, 0, m_buffer_size, ReconstructionType::Merge}; } private: diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 078c4a9..29a45e5 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -161,13 +161,15 @@ public: /* perform the reconstruction itself */ std::vector<const ShardType *> shards; for (ShardID shid : task.sources) { - assert(shid.level_idx < (level_index) m_levels.size()); + assert(shid.level_idx <= (level_index) m_levels.size()); assert(shid.shard_idx >= -1); if (shid == buffer_shid) { assert(bv); ShardType *buffer_shard = new ShardType(std::move(*bv)); shards.push_back(buffer_shard); + } else if (shid.level_idx == (level_index) m_levels.size()) { + continue; } else if (shid.shard_idx == all_shards_idx) { /* if unspecified, push all shards into the vector */ for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) { @@ -188,6 +190,8 @@ public: for (ShardID shid : task.sources) { if (shid == buffer_shid) { continue; + } else if (shid.level_idx == (level_index) m_levels.size()) { + continue; } else if (shid.shard_idx == all_shards_idx) { m_levels[shid.level_idx]->truncate(); } else { @@ -315,6 +319,22 @@ public: return ts_prop <= (long double) max_delete_prop; } + void print_structure() const { + for (size_t i=0; i<m_levels.size(); i++) { + fprintf(stdout, "[%ld]:\t", i); + + if (m_levels[i]) { + for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) { + fprintf(stdout, "(%ld: %ld) ", j, m_levels[i]->get_shard(j)->get_record_count()); + } + } else { + fprintf(stdout, "[Empty]"); + } + + fprintf(stdout, "\n"); + } + } + private: std::atomic<size_t> m_refcnt; LevelVector m_levels; |