summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-01-24 17:45:45 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-01-24 17:45:45 -0500
commitf149a2459cfc2007f755d792b3c4e567d30c132f (patch)
tree7534bdde8332d37dd2f291334b8c59f70790d98e
parentcbb19322557671a23b6643ce9079594c7eec716b (diff)
downloaddynamic-extension-f149a2459cfc2007f755d792b3c4e567d30c132f.tar.gz
some progress
-rw-r--r--include/framework/DynamicExtension.h21
-rw-r--r--include/framework/scheduling/Version.h6
-rw-r--r--include/framework/structure/ExtensionStructure.h80
3 files changed, 33 insertions, 74 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 62aaf88..0331353 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -413,8 +413,27 @@ private:
static void reconstruction(void *arguments) {
auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments;
+ auto extension = (DynamicExtension *) args->extension;
+ extension->SetThreadAffinity();
+
+ if (args->priority == ReconstructionPriority::FLUSH) {
+ /* we first construct a shard from the buffer */
+ auto buffview = args->version->get_buffer();
+ auto new_head = buffview.get_tail();
+ auto new_shard = Shard(std::move(buffview));
+
+ /* copy the currently active version's structure */
+ auto structure = extension->get_active_version()->get_structure()->clone();
+
+
+ }
+
- ((DynamicExtension *)args->extension)->SetThreadAffinity();
+ else {
+
+ }
+
+
Structure *vers = args->version->get_mutable_structure();
ReconstructionTask flush_task;
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h
index e961777..2b2b5ba 100644
--- a/include/framework/scheduling/Version.h
+++ b/include/framework/scheduling/Version.h
@@ -56,9 +56,9 @@ public:
size_t get_epoch_number() const { return m_epoch_number; }
- const Structure *get_structure() const { return m_structure; }
+ const Structure *get_structure() const { return m_structure.get(); }
- Structure *get_mutable_structure() { return m_structure; }
+ Structure *get_mutable_structure() { return m_structure.get(); }
BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); }
@@ -113,7 +113,7 @@ public:
private:
Buffer *m_buffer;
- Structure *m_structure;
+ std::unique_ptr<Structure> m_structure;
std::mutex m_buffer_lock;
std::atomic<bool> m_active_merge;
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index da91509..c304f1c 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -174,7 +174,7 @@ public:
}
} else if (shid == buffer_shid) {
assert(bv);
- ShardType *buffer_shard = new ShardType(std::move(buffer));
+ ShardType *buffer_shard = new ShardType(std::move(bv));
shards.push_back(buffer_shard);
} else {
shards.push_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx));
@@ -205,74 +205,6 @@ public:
}
}
- inline void perform_flush(ReconstructionTask task, BuffView buffer) {
- /*
- * FIXME: this might be faster with a custom interface for merging
- * the buffer and a vector of shards, but that would also complicate
- * the shard interface a lot, so we'll leave it like this for now. It
- * does mean that policies that merge the buffer into L0 double-process
- * the buffer itself. Given that we're unlikely to actually use policies
- * like that, we'll leave this as low priority.
- */
-
- // /* insert the first level, if needed */
- // if (m_levels.size() == 0) {
- // m_levels.push_back(
- // std::make_shared<InternalLevel<ShardType, QueryType>>(0));
- // }
-
- perform_reconstruction(task, &buffer);
-
- // ShardType *buffer_shard = new ShardType(std::move(buffer));
- // if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) {
- // m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard));
- // } else if (task.type == ReconstructionType::Merge) {
- // std::vector<const ShardType *> shards;
- // for (size_t i=0; i<task.sources.size(); i++) {
- // ShardID shid = task.sources[i];
- // if (shid != buffer_shid) {
- // shards.emplace_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx));
- // }
- // }
-
- // shards.emplace_back(buffer_shard);
- // ShardType *new_shard = new ShardType(shards);
- // m_levels[0]->append(std::shared_ptr<ShardType>(new_shard));
- // for (size_t i=0; i<task.sources.size(); i++) {
- // ShardID shid = task.sources[i];
- // if (shid != buffer_shid) {
- // m_levels[shid.level_idx]->delete_shard(shid.shard_idx);
- // }
- // }
- // } else {
- // std::vector<const ShardType *> shards;
- // for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count();
- // i++) {
- // if (m_levels[0]->get_shard(i)) {
- // shards.push_back(m_levels[0]->get_shard(i));
- // }
-
- // shards.push_back(buffer_shard);
- // ShardType *new_shard = new ShardType(shards);
- // m_levels[0]->truncate();
- // m_levels[0]->append(std::shared_ptr<ShardType>(new_shard));
- // }
- // }
- }
-
- bool take_reference() {
- m_refcnt.fetch_add(1);
- return true;
- }
-
- bool release_reference() {
- assert(m_refcnt.load() > 0);
- m_refcnt.fetch_add(-1);
- return true;
- }
-
- size_t get_reference_count() const { return m_refcnt.load(); }
-
std::vector<typename QueryType::LocalQuery *>
get_local_queries(std::vector<std::pair<ShardID, ShardType *>> &shards,
typename QueryType::Parameters *parms) const {
@@ -286,6 +218,15 @@ public:
return queries;
}
+ size_t l0_size() const {
+ return m_levels[0]->get_shard_count();
+ }
+
+ void append_l0(std::shared_ptr<ShardType> shard) {
+ // FIXME: ensure that there's always a level 0 in the version
+ m_levels[0]->append(shard);
+ }
+
LevelVector const &get_level_vector() const { return m_levels; }
@@ -315,7 +256,6 @@ public:
}
private:
- std::atomic<size_t> m_refcnt;
LevelVector m_levels;
};