summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-01-31 16:16:28 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-01-31 16:16:28 -0500
commit3061bfbf1c4b5092fa4234de3105a615fcef18ea (patch)
tree4c41cdf3b8e3af6e38ba1c8f283d2e35086cc2af /include/framework/DynamicExtension.h
parent30da48151f58803968ca3ef5d42e66a9223d80a4 (diff)
downloaddynamic-extension-3061bfbf1c4b5092fa4234de3105a615fcef18ea.tar.gz
More updates
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h80
1 files changed, 63 insertions, 17 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index da2945a..91bed98 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -87,6 +87,8 @@ public:
m_config.maximum_threads);
m_active_version.store(
std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0));
+
+ m_version_counter = INITIAL_VERSION;
}
/**
@@ -226,7 +228,7 @@ public:
* @return The number of levels within the index
*/
size_t get_height() {
- return get_active_version()->get_structure()->get_height();
+ return get_active_version()->get_structure()->get_height() - 1;
}
/**
@@ -381,8 +383,6 @@ private:
size_t m_core_cnt;
std::atomic<int> m_next_core;
- ReconPolicyType const *m_recon_policy;
-
/* versioning + concurrency variables */
std::atomic<size_t> m_version_counter;
std::atomic<std::shared_ptr<VersionType>> m_active_version;
@@ -390,6 +390,8 @@ private:
std::condition_variable m_version_advance_cv;
std::mutex m_version_advance_mtx;
+ std::atomic<bool> m_flush_in_progress = false;
+
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
void enforce_delete_invariant(VersionType *version) {
@@ -420,6 +422,8 @@ private:
}
}
+ size_t m_flush_cnt = 0;
+
static void reconstruction(void *arguments) {
auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments;
auto extension = (DynamicExtension *)args->extension;
@@ -437,11 +441,14 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
+ assert(extension->m_flush_in_progress.load());
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
- args->version->set_next_buffer_head(buffview.get_tail());
+ assert(buffview.get_tail() != buffview.get_head());
+
auto new_shard = std::make_shared<ShardType>(std::move(buffview));
+
/*
* Flushes already know their version id. To avoid needing to
* do any update reconciliation between structures, they wait
@@ -449,14 +456,22 @@ private:
* and only then take a copy of the structure.
*/
extension->await_version(args->version->get_id() - 1);
- StructureType *structure =
- extension->get_active_version()->get_structure()->copy();
- /* add the newly created shard to the structure copy */
- structure->append_l0(new_shard);
+ if (extension->m_config.recon_maint_disabled) {
+ assert(args->version->get_mutable_structure());
+ args->version->get_mutable_structure()->append_l0(std::move(new_shard));
+ } else {
+ assert(!args->version->get_mutable_structure());
+ auto structure = std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy());
- /* set this version's structure to the newly created one */
- args->version->set_structure(std::unique_ptr<StructureType>(structure));
+ /* add the newly created shard to the structure copy */
+ structure->append_l0(std::move(new_shard));
+
+ /* set this version's structure to the newly created one */
+ args->version->set_structure(std::move(structure));
+ }
+
+ args->version->advance_buffer_head();
}
/* perform all of the reconstructions */
@@ -476,6 +491,10 @@ private:
/* advance the index to the newly finished version */
extension->install_new_version(args->version);
+ if (args->priority == ReconstructionPriority::FLUSH) {
+ extension->m_flush_in_progress.store(false);
+ }
+
/* manually delete the argument object */
delete args;
}
@@ -549,11 +568,11 @@ private:
* at the time it is activated, but the version number must be claimed
* early to minimize activation blocking.
*/
- version_ptr create_version() {
+ version_ptr create_version_flush(std::unique_ptr<StructureType> structure) {
size_t version_id = m_version_counter.fetch_add(1);
auto active_version = get_active_version();
std::shared_ptr<VersionType> new_version =
- std::make_shared<VersionType>(version_id, nullptr, m_buffer.get(), active_version->get_buffer().get_head());
+ std::make_shared<VersionType>(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
return new_version;
}
@@ -565,7 +584,7 @@ private:
* no version number is claimed until the version is activated, to
* prevent blocking buffer flushes.
*/
- version_ptr create_version(std::unique_ptr<StructureType> structure) {
+ version_ptr create_version_maint(std::unique_ptr<StructureType> structure) {
auto active_version = get_active_version();
version_ptr new_version =
std::make_shared<VersionType>(INVALID_VERSION, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
@@ -622,11 +641,34 @@ private:
void schedule_flush() {
begin_reconstruction_scheduling();
- auto new_version = create_version();
+
+ bool old = m_flush_in_progress.load();
+ if (old || !m_flush_in_progress.compare_exchange_strong(old, true)) {
+ end_reconstruction_scheduling();
+ return;
+ }
+
+ /*
+ * for "legacy" policies, without background reconstruction, we need
+ * a valid structure object as part of the version prior to determining
+ * the flush operations. Otherwise, the flush operation should only ever
+ * do the flush itself, and so no structure is needed at this point
+ *
+ * delaying obtaining the structure until later, when maintenance
+ * reconstructions are enabled, removes the need for flushes to reconcile
+ * differences in structure between when they are scheduled and when they
+ * finish
+ */
+ std::unique_ptr<StructureType> structure = nullptr;
+ if (m_config.recon_maint_disabled) {
+ structure = std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy());
+ }
+
+ auto new_version = create_version_flush(std::move(structure));
auto *args = new ReconstructionArgs<ShardType, QueryType>();
args->version = new_version;
- args->tasks = m_recon_policy->get_flush_tasks(args->version.get());
+ args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get());
args->extension = this;
args->priority = ReconstructionPriority::FLUSH;
@@ -646,16 +688,20 @@ private:
void schedule_maint_reconstruction(bool take_reconstruction_lock = true) {
+ if (m_config.recon_maint_disabled) {
+ return;
+ }
+
if (take_reconstruction_lock) {
begin_reconstruction_scheduling();
}
// FIXME: memory management issue here?
- auto new_version = create_version(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy()));
+ auto new_version = create_version_maint(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy()));
auto *args = new ReconstructionArgs<ShardType, QueryType>();
args->version = new_version;
- args->tasks = m_recon_policy->get_reconstruction_tasks(args->version.get());
+ args->tasks = m_config.recon_policy->get_reconstruction_tasks(args->version.get());
args->extension = this;
args->priority = ReconstructionPriority::MAINT;