From fd0e99e618319974320ed3fb49535aec501be1fb Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 6 Feb 2025 15:56:33 -0500 Subject: Background compaction stuff --- include/framework/scheduling/Task.h | 1 + include/framework/scheduling/Version.h | 40 +++++++++++++++++++++++++++++----- 2 files changed, 35 insertions(+), 6 deletions(-) (limited to 'include/framework/scheduling') diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 1591909..ed40d3d 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -36,6 +36,7 @@ struct ReconstructionArgs { ReconstructionVector tasks; void *extension; ReconstructionPriority priority; + size_t initial_version; }; template Q, typename DE> struct QueryArgs { diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index 3e93202..fa677f2 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -89,16 +89,44 @@ public: return m_buffer->advance_head(new_head); } + void merge_changes_from(Version *old, size_t version_id) { + /* + * for a maint reconstruction, the buffer head may have advanced + * during the reconstruction; we don't need to adjust the buffer + * for maintenance reconstructions, so we can simply "catch" the + * internal head index up to the current version. + */ + if (old->m_buffer_head > m_buffer_head) { + m_buffer_head = old->m_buffer_head; + } + + // FIXME: we should also ensure that we don't clobber anything + // in the event that multiple concurrent reconstructions affect + // the same levels. As it stands, two reconstructions *could* share + // source shards, resulting in some records being lost or duplicated. + // + // For the moment, I'm being careful to avoid this within the + // scheduling policy itself, and only forwarding changes to this + // version. + + /* using INVALID_VERSION disables shard reconcilliation */ + if (version_id == 0) { + return; + } + + /* add any shards newer than version_id to this version */ + auto old_structure = old->get_structure(); + m_structure->merge_structure(old_structure, version_id); + } + + void update_shard_version(size_t version) { + m_structure->update_shard_version(version); + } + private: BufferType *m_buffer; std::unique_ptr m_structure; - /* - * The number of currently active jobs - * (queries/merges) operating on this - * epoch. An epoch can only be retired - * when this number is 0. - */ size_t m_id; size_t m_buffer_head; ssize_t m_pending_buffer_head; -- cgit v1.2.3