diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-01-24 17:45:45 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-01-24 17:45:45 -0500 |
| commit | f149a2459cfc2007f755d792b3c4e567d30c132f (patch) | |
| tree | 7534bdde8332d37dd2f291334b8c59f70790d98e | |
| parent | cbb19322557671a23b6643ce9079594c7eec716b (diff) | |
| download | dynamic-extension-f149a2459cfc2007f755d792b3c4e567d30c132f.tar.gz | |
some progress
| -rw-r--r-- | include/framework/DynamicExtension.h | 21 | ||||
| -rw-r--r-- | include/framework/scheduling/Version.h | 6 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 80 |
3 files changed, 33 insertions, 74 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 62aaf88..0331353 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -413,8 +413,27 @@ private: static void reconstruction(void *arguments) { auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments; + auto extension = (DynamicExtension *) args->extension; + extension->SetThreadAffinity(); + + if (args->priority == ReconstructionPriority::FLUSH) { + /* we first construct a shard from the buffer */ + auto buffview = args->version->get_buffer(); + auto new_head = buffview.get_tail(); + auto new_shard = Shard(std::move(buffview)); + + /* copy the currently active version's structure */ + auto structure = extension->get_active_version()->get_structure()->clone(); + + + } + - ((DynamicExtension *)args->extension)->SetThreadAffinity(); + else { + + } + + Structure *vers = args->version->get_mutable_structure(); ReconstructionTask flush_task; diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index e961777..2b2b5ba 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -56,9 +56,9 @@ public: size_t get_epoch_number() const { return m_epoch_number; } - const Structure *get_structure() const { return m_structure; } + const Structure *get_structure() const { return m_structure.get(); } - Structure *get_mutable_structure() { return m_structure; } + Structure *get_mutable_structure() { return m_structure.get(); } BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); } @@ -113,7 +113,7 @@ public: private: Buffer *m_buffer; - Structure *m_structure; + std::unique_ptr<Structure> m_structure; std::mutex m_buffer_lock; std::atomic<bool> m_active_merge; diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index da91509..c304f1c 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -174,7 +174,7 @@ public: } } else if (shid == buffer_shid) { assert(bv); - ShardType *buffer_shard = new ShardType(std::move(buffer)); + ShardType *buffer_shard = new ShardType(std::move(bv)); shards.push_back(buffer_shard); } else { shards.push_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx)); @@ -205,74 +205,6 @@ public: } } - inline void perform_flush(ReconstructionTask task, BuffView buffer) { - /* - * FIXME: this might be faster with a custom interface for merging - * the buffer and a vector of shards, but that would also complicate - * the shard interface a lot, so we'll leave it like this for now. It - * does mean that policies that merge the buffer into L0 double-process - * the buffer itself. Given that we're unlikely to actually use policies - * like that, we'll leave this as low priority. - */ - - // /* insert the first level, if needed */ - // if (m_levels.size() == 0) { - // m_levels.push_back( - // std::make_shared<InternalLevel<ShardType, QueryType>>(0)); - // } - - perform_reconstruction(task, &buffer); - - // ShardType *buffer_shard = new ShardType(std::move(buffer)); - // if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) { - // m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard)); - // } else if (task.type == ReconstructionType::Merge) { - // std::vector<const ShardType *> shards; - // for (size_t i=0; i<task.sources.size(); i++) { - // ShardID shid = task.sources[i]; - // if (shid != buffer_shid) { - // shards.emplace_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx)); - // } - // } - - // shards.emplace_back(buffer_shard); - // ShardType *new_shard = new ShardType(shards); - // m_levels[0]->append(std::shared_ptr<ShardType>(new_shard)); - // for (size_t i=0; i<task.sources.size(); i++) { - // ShardID shid = task.sources[i]; - // if (shid != buffer_shid) { - // m_levels[shid.level_idx]->delete_shard(shid.shard_idx); - // } - // } - // } else { - // std::vector<const ShardType *> shards; - // for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count(); - // i++) { - // if (m_levels[0]->get_shard(i)) { - // shards.push_back(m_levels[0]->get_shard(i)); - // } - - // shards.push_back(buffer_shard); - // ShardType *new_shard = new ShardType(shards); - // m_levels[0]->truncate(); - // m_levels[0]->append(std::shared_ptr<ShardType>(new_shard)); - // } - // } - } - - bool take_reference() { - m_refcnt.fetch_add(1); - return true; - } - - bool release_reference() { - assert(m_refcnt.load() > 0); - m_refcnt.fetch_add(-1); - return true; - } - - size_t get_reference_count() const { return m_refcnt.load(); } - std::vector<typename QueryType::LocalQuery *> get_local_queries(std::vector<std::pair<ShardID, ShardType *>> &shards, typename QueryType::Parameters *parms) const { @@ -286,6 +218,15 @@ public: return queries; } + size_t l0_size() const { + return m_levels[0]->get_shard_count(); + } + + void append_l0(std::shared_ptr<ShardType> shard) { + // FIXME: ensure that there's always a level 0 in the version + m_levels[0]->append(shard); + } + LevelVector const &get_level_vector() const { return m_levels; } @@ -315,7 +256,6 @@ public: } private: - std::atomic<size_t> m_refcnt; LevelVector m_levels; }; |