From 30da48151f58803968ca3ef5d42e66a9223d80a4 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 27 Jan 2025 18:17:21 -0500 Subject: progress --- include/framework/DynamicExtension.h | 229 +++++++++++---------- include/framework/reconstruction/BSMPolicy.h | 14 +- .../reconstruction/FixedShardCountPolicy.h | 2 +- include/framework/reconstruction/FloodL0Policy.h | 9 +- include/framework/reconstruction/LevelingPolicy.h | 21 +- .../reconstruction/ReconstructionPolicy.h | 3 +- include/framework/reconstruction/TieringPolicy.h | 17 +- include/framework/scheduling/Task.h | 2 +- include/framework/scheduling/Version.h | 102 ++++----- include/framework/structure/ExtensionStructure.h | 8 +- include/framework/util/Configuration.h | 4 +- 11 files changed, 196 insertions(+), 215 deletions(-) (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 0331353..da2945a 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -36,10 +36,10 @@ class DynamicExtension { private: /* convenience typedefs for commonly used types within the class */ typedef typename ShardType::RECORD RecordType; - typedef MutableBuffer Buffer; - typedef ExtensionStructure Structure; - typedef Version Version; - typedef BufferView BufView; + typedef MutableBuffer BufferType; + typedef ExtensionStructure StructureType; + typedef Version VersionType; + typedef BufferView BufferViewType; typedef ReconstructionPolicy ReconPolicyType; typedef DEConfiguration ConfType; @@ -52,6 +52,11 @@ private: static constexpr size_t QUERY = 1; static constexpr size_t RECONSTRUCTION = 2; + typedef std::shared_ptr version_ptr; + typedef size_t version_id; + static constexpr size_t INVALID_VERSION = 0; + static constexpr size_t INITIAL_VERSION = 1; + public: /** * Create a new Dynamized version of a data structure, supporting @@ -74,12 +79,14 @@ public: * framework's scheduler for use in answering queries and * performing compactions and flushes, etc. */ - DynamicExtension(ConfType &&config) : m_config(std::move(config)) { - m_buffer = std::make_unique(m_config.buffer_flush_trigger, - m_config.buffer_size); - - m_sched = SchedType(m_config.maximum_memory_usage, m_config.maximum_threads); - m_active_version.load(std::make_shared(INITIAL_VERSION, new Structure(), m_buffer.get(), 0)); + DynamicExtension(ConfType &&config) : m_config(std::move(config)) { + m_buffer = + std::make_unique(m_config.buffer_flush_trigger, m_config.buffer_size); + + m_sched = std::make_unique(m_config.maximum_memory_usage, + m_config.maximum_threads); + m_active_version.store( + std::make_shared(INITIAL_VERSION, std::make_unique(), m_buffer.get(), 0)); } /** @@ -90,10 +97,10 @@ public: */ ~DynamicExtension() { /* let any in-flight version transitions finish */ - await_newest_version(); + await_version(); /* shutdown the scheduler */ - m_sched.shutdown(); + m_sched->shutdown(); } /** @@ -274,10 +281,11 @@ public: * the index. Ownership of this object is transfered to the * caller. */ + // FIXME: switch this over to std::unique_ptr ShardType * create_static_structure(bool await_reconstruction_completion = false) { if (await_reconstruction_completion) { - await_newest_version(); + await_version(); } auto version = get_active_version(); @@ -317,19 +325,27 @@ public: } /* - * Determines the newest pending version at the time of call, and - * blocks until that version becomes active. + * Blocks until the specified version id becomes active. If no version + * id is provided, wait for the newest pending version to be installed. + * + * NOTE: this method will return once the specified version has been + * installed, but does not guarantee that the specified version + * is the currently active one when it returns. It is possible + * that the active version upon the return of this method is newer + * than the one requested. */ - void await_newest_version() { - /* + void await_version(version_id vid = INVALID_VERSION) { + /* * versions are assigned by fetch and add on the counter, so the * newest assigned version number will be one less than the value * of the counter */ - auto newest_pending_version = m_version_counter.load() - 1; + if (vid == INVALID_VERSION) { + vid = m_version_counter.load() - 1; + } /* versions signal on m_version_advance_cv when they activate */ - while (m_active_version.load() < newest_pending_version) { + while (m_active_version.load()->get_id() < vid) { std::unique_lock lk(m_version_advance_mtx); m_version_advance_cv.wait(lk); } @@ -354,14 +370,13 @@ public: * Calls SchedType::print_statistics, which should write a report of * scheduler performance statistics to stdout. */ - void print_scheduler_statistics() const { m_sched.print_statistics(); } + void print_scheduler_statistics() const { m_sched->print_statistics(); } private: ConfType m_config; - - SchedType m_sched; - std::unique_ptr m_buffer; + std::unique_ptr m_sched; + std::unique_ptr m_buffer; size_t m_core_cnt; std::atomic m_next_core; @@ -370,20 +385,14 @@ private: /* versioning + concurrency variables */ std::atomic m_version_counter; - typedef std::atomic> version_ptr; - - version_ptr m_active_version; - - typedef size_t version_id; - const size_t INVALID_VERSION = 0; - const size_t INITIAL_VERSION = 1; + std::atomic> m_active_version; std::condition_variable m_version_advance_cv; std::mutex m_version_advance_mtx; alignas(64) std::atomic m_scheduling_reconstruction; - void enforce_delete_invariant(Version *version) { + void enforce_delete_invariant(VersionType *version) { auto structure = version->get_structure(); auto compactions = structure->get_compaction_tasks(); @@ -401,7 +410,7 @@ private: auto wait = args->result.get_future(); - m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); + m_sched->schedule_job(reconstruction, 0, args, RECONSTRUCTION); /* wait for compaction completion */ wait.get(); @@ -413,60 +422,61 @@ private: static void reconstruction(void *arguments) { auto args = (ReconstructionArgs *)arguments; - auto extension = (DynamicExtension *) args->extension; + auto extension = (DynamicExtension *)args->extension; extension->SetThreadAffinity(); + /* + * For "normal" flushes, the task vector should be empty, so this is + * all that will happen. Piggybacking internal reconstructions off + * the flush WILL bottleneck the system, but is left in place to + * allow the traditional policies (LEVELING, TIERING, etc.) to be + * emulated within the new system. + * + * Background reconstructions will not have a priority level of + * FLUSH, and will already have a structure present. As a result, + * this code will be bypassed in that case. + */ if (args->priority == ReconstructionPriority::FLUSH) { /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); - auto new_head = buffview.get_tail(); - auto new_shard = Shard(std::move(buffview)); + args->version->set_next_buffer_head(buffview.get_tail()); + auto new_shard = std::make_shared(std::move(buffview)); - /* copy the currently active version's structure */ - auto structure = extension->get_active_version()->get_structure()->clone(); - - - } + /* + * Flushes already know their version id. To avoid needing to + * do any update reconciliation between structures, they wait + * until the version directly preceeding them has been installed, + * and only then take a copy of the structure. + */ + extension->await_version(args->version->get_id() - 1); + StructureType *structure = + extension->get_active_version()->get_structure()->copy(); + /* add the newly created shard to the structure copy */ + structure->append_l0(new_shard); - else { - + /* set this version's structure to the newly created one */ + args->version->set_structure(std::unique_ptr(structure)); } - - Structure *vers = args->version->get_mutable_structure(); - - ReconstructionTask flush_task; - flush_task.type = ReconstructionType::Invalid; - + /* perform all of the reconstructions */ + StructureType *structure = args->version->get_mutable_structure(); for (size_t i = 0; i < args->tasks.size(); i++) { - if (args->tasks[i].sources.size() > 0 && - args->tasks[i].sources[0] == buffer_shid) { - flush_task = args->tasks[i]; - continue; - } - - vers->perform_reconstruction(args->tasks[i]); + structure->perform_reconstruction(args->tasks[i]); } - if (flush_task.type != ReconstructionType::Invalid) { - /* - * we'll grab the buffer AFTER doing the internal reconstruction, so we - * can flush as many records as possible in one go. The reconstruction - * was done so as to make room for the full buffer anyway, so there's - * no real benefit to doing this first. - */ - auto buffer_view = args->version->get_buffer(); - size_t new_head = buffer_view.get_tail(); - - vers->perform_flush(flush_task, std::move(buffer_view)); - args->result.set_value(true); - ((DynamicExtension *)args->extension)->advance_version(new_head); - } else { - args->result.set_value(true); + /* + * if there isn't already a version id on the new version (i.e., the + * reconstruction isn't a flush), generate one. + */ + if (args->version->get_id() == INVALID_VERSION) { + args->version->set_id(extension->m_version_counter.fetch_add(1)); } + /* advance the index to the newly finished version */ + extension->install_new_version(args->version); + /* manually delete the argument object */ delete args; } @@ -532,7 +542,7 @@ private: version_ptr get_active_version() { return m_active_version.load(); } - /* + /* * Create a new version with an assigned version number, but without * an assigned copy of the structure. Intended for use in flushing, * where the structure will be copied from the currently active version @@ -542,7 +552,8 @@ private: version_ptr create_version() { size_t version_id = m_version_counter.fetch_add(1); auto active_version = get_active_version(); - version_ptr new_version = std::make_shared(version_id, nullptr, &m_buffer, 0); + std::shared_ptr new_version = + std::make_shared(version_id, nullptr, m_buffer.get(), active_version->get_buffer().get_head()); return new_version; } @@ -551,51 +562,57 @@ private: * Create a new version without an assigned version number, but with * a copy of the extension structure. This is for use with background * reconstructions, where the underlying structure is manipulated, but - * no version number is claimed until the version is activated, to + * no version number is claimed until the version is activated, to * prevent blocking buffer flushes. */ - version_ptr create_version(Structure *structure) { - version_ptr new_version = std::make_shared(INVALID_VERSION, structure, &m_buffer, 0); + version_ptr create_version(std::unique_ptr structure) { + auto active_version = get_active_version(); + version_ptr new_version = + std::make_shared(INVALID_VERSION, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head()); return new_version; } void install_new_version(version_ptr new_version) { assert(new_version->get_structure()); - assert(new_version->get_version_number() != INVALID_VERSION); - - auto old = get_active_version(); - assert(new_version->get_version_number() > old->get_version_number()); + assert(new_version->get_id() != INVALID_VERSION); /* wait until our turn to install the new version */ - auto lk = std::unique_lock(m_version_advance_mtx); - while (new_version.load()->get_verison_number() != old.load()->get_version_number() + 1) { - m_version_advance_cv.wait(lk); - old = get_active_version(); - } + await_version(new_version->get_id() - 1); + + auto old = get_active_version(); // FIXME: implement this interface // new_version->merge_changes_from(old.load().get()); - /* + /* * Only one version can have a given number, so we are safe to * directly assign here--nobody else is going to change it out from - * under us. We're also protected under the mutex. + * under us. + */ + m_active_version.store(new_version); + + /* + * My understanding is that you don't *really* need this mutex for + * safety in modern C++ when sending the signal. But I'll grab it + * anyway out of an abundance of caution. I doubt this will be a + * major bottleneck. */ - m_active_version.store(new_version); - m_version_advance_cv.notify_all(); + auto lk = std::unique_lock(m_version_advance_mtx); + m_version_advance_cv.notify_all(); } - Structure *create_scratch_structure() { + StructureType *create_scratch_structure() { return get_active_version()->get_structure()->copy(); } - void begin_reconstruction_scheduling() { bool cur_val; do { cur_val = m_scheduling_reconstruction.load(); - } while(cur_val == true && !m_scheduling_reconstruction.compare_exchange_strong(cur_val, true)); + } while ( + cur_val == true && + !m_scheduling_reconstruction.compare_exchange_strong(cur_val, true)); } void end_reconstruction_scheduling() { @@ -608,16 +625,17 @@ private: auto new_version = create_version(); auto *args = new ReconstructionArgs(); - args->version.load(new_version); + args->version = new_version; args->tasks = m_recon_policy->get_flush_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::FLUSH; - /* - * NOTE: args is deleted by the reconstruction job, so shouldn't be - * freed here + /* + * NOTE: args is deleted by the reconstruction job, so shouldn't be + * freed here */ - m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION); + m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, + RECONSTRUCTION); if (m_config.recon_enable_maint_on_flush) { schedule_maint_reconstruction(false); @@ -626,28 +644,27 @@ private: end_reconstruction_scheduling(); } - - void schedule_maint_reconstruction(bool take_reconstruction_lock=true) { + void schedule_maint_reconstruction(bool take_reconstruction_lock = true) { if (take_reconstruction_lock) { begin_reconstruction_scheduling(); } // FIXME: memory management issue here? - auto new_version = create_version(m_active_version.load()->get_structure()); + auto new_version = create_version(std::unique_ptr(m_active_version.load()->get_structure()->copy())); auto *args = new ReconstructionArgs(); - args->version.load(new_version); + args->version = new_version; args->tasks = m_recon_policy->get_reconstruction_tasks(args->version.get()); args->extension = this; args->priority = ReconstructionPriority::MAINT; - /* - * NOTE: args is deleted by the reconstruction job, so shouldn't be - * freed here + /* + * NOTE: args is deleted by the reconstruction job, so shouldn't be + * freed here */ - m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION); - + m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, + RECONSTRUCTION); if (take_reconstruction_lock) { end_reconstruction_scheduling(); @@ -662,7 +679,7 @@ private: args->query_parms = std::move(query_parms); auto result = args->result_set.get_future(); - m_sched.schedule_job(async_query, 0, (void *)args, QUERY); + m_sched->schedule_job(async_query, 0, (void *)args, QUERY); return result; } diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h index c42b928..eaa374a 100644 --- a/include/framework/reconstruction/BSMPolicy.h +++ b/include/framework/reconstruction/BSMPolicy.h @@ -11,7 +11,7 @@ #include #include "framework/reconstruction/ReconstructionPolicy.h" -#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/Version.h" #include "util/types.h" namespace de { @@ -28,6 +28,12 @@ public: get_reconstruction_tasks(const Version *version, size_t incoming_reccnt) const override { ReconstructionVector reconstructions; + return reconstructions; + } + + ReconstructionVector + get_flush_tasks(const Version *version) const override { + ReconstructionVector reconstructions; auto levels = version->get_structure()->get_level_vector(); level_index target_level = find_reconstruction_target(levels); @@ -53,12 +59,6 @@ public: return reconstructions; } - ReconstructionVector - get_flush_tasks(const Version *version) const override { - ReconstructionVector v; - v.add_reconstruction(ReconstructionTask {{buffer_shid}, 0, m_buffer_size, ReconstructionType::Merge}); - } - private: level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = invalid_level_idx; diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h index 2a3c977..0768daa 100644 --- a/include/framework/reconstruction/FixedShardCountPolicy.h +++ b/include/framework/reconstruction/FixedShardCountPolicy.h @@ -11,7 +11,7 @@ #include #include "framework/reconstruction/ReconstructionPolicy.h" -#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/Version.h" #include "util/types.h" namespace de { diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h index 8304d8a..94bed70 100644 --- a/include/framework/reconstruction/FloodL0Policy.h +++ b/include/framework/reconstruction/FloodL0Policy.h @@ -11,7 +11,7 @@ #include #include "framework/reconstruction/ReconstructionPolicy.h" -#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/Version.h" #include "util/types.h" namespace de { @@ -26,17 +26,14 @@ public: ReconstructionVector get_reconstruction_tasks(const Version *version, size_t incoming_reccnt) const override { - ReconstructionVector reconstructions; return reconstructions; - } ReconstructionVector get_flush_tasks(const Version *version) const override { - ReconstructionVector v; - v.add_reconstruction(ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}); + ReconstructionVector reconstructions; + return reconstructions; } private: diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h index 176492e..1523e74 100644 --- a/include/framework/reconstruction/LevelingPolicy.h +++ b/include/framework/reconstruction/LevelingPolicy.h @@ -11,7 +11,7 @@ #include #include "framework/reconstruction/ReconstructionPolicy.h" -#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/Version.h" #include "util/types.h" namespace de { @@ -25,14 +25,19 @@ public: : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} ReconstructionVector - get_reconstruction_tasks(const Version *version, - size_t incoming_reccnt) const override { + get_reconstruction_tasks(const Version *version) const override { + ReconstructionVector reconstructions; + return reconstructions; + } + + ReconstructionVector + get_flush_tasks(const Version *version) const override { ReconstructionVector reconstructions; auto levels = version->get_structure()->get_level_vector(); level_index target_level = find_reconstruction_target(levels); assert(target_level != -1); - level_index source_level = 0; + level_index source_level = 1; if (target_level == invalid_level_idx) { /* grow */ @@ -51,14 +56,6 @@ public: return reconstructions; } - ReconstructionVector - get_flush_tasks(const Version *version) const override { - ReconstructionVector v; - v.add_reconstruction(ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge}); - return v; - } - private: level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = invalid_level_idx; diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h index 48bddcf..6f99b32 100644 --- a/include/framework/reconstruction/ReconstructionPolicy.h +++ b/include/framework/reconstruction/ReconstructionPolicy.h @@ -23,8 +23,7 @@ class ReconstructionPolicy { public: ReconstructionPolicy() {} - virtual ReconstructionVector get_reconstruction_tasks(const Version *version, - size_t incoming_reccnt) const = 0; + virtual ReconstructionVector get_reconstruction_tasks(const Version *version) const = 0; virtual ReconstructionVector get_flush_tasks(const Version *version) const = 0; }; } diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h index 63be5fe..dce5c3c 100644 --- a/include/framework/reconstruction/TieringPolicy.h +++ b/include/framework/reconstruction/TieringPolicy.h @@ -11,7 +11,7 @@ #include #include "framework/reconstruction/ReconstructionPolicy.h" -#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/Version.h" #include "util/types.h" namespace de { @@ -26,6 +26,12 @@ public: get_reconstruction_tasks(const Version *version, size_t incoming_reccnt) const override { ReconstructionVector reconstructions; + return reconstructions; + } + + ReconstructionVector + get_flush_tasks(const Version *version) const override { + ReconstructionVector reconstructions; auto levels = version->get_structure()->get_level_vector(); level_index target_level = find_reconstruction_target(levels); @@ -49,15 +55,6 @@ public: return reconstructions; } - ReconstructionVector - get_flush_tasks(const Version *version) const override { - ReconstructionVector v; - - v.add_reconstruction(ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}); - return v; - } - private: level_index find_reconstruction_target(LevelVector &levels) const { level_index target_level = invalid_level_idx; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 197e8bf..1591909 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -32,7 +32,7 @@ enum class ReconstructionPriority { template QueryType> struct ReconstructionArgs { typedef typename ShardType::RECORD RecordType; - std::atomic>> version; + std::shared_ptr> version; ReconstructionVector tasks; void *extension; ReconstructionPriority priority; diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index 2b2b5ba..8d3d038 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -20,30 +20,21 @@ template QueryType> class Version { private: typedef typename ShardType::RECORD RecordType; - typedef MutableBuffer Buffer; - typedef ExtensionStructure Structure; - typedef BufferView BufView; + typedef MutableBuffer BufferType; + typedef ExtensionStructure StructureType; + typedef BufferView BufferViewType; public: - Version(size_t number = 0) - : m_buffer(nullptr), m_structure(nullptr), m_active_merge(false), - m_epoch_number(number), m_buffer_head(0) {} - - Version(size_t number, Structure *structure, Buffer *buff, size_t head) - : m_buffer(buff), m_structure(structure), m_active_merge(false), - m_epoch_number(number), m_buffer_head(head) { - structure->take_reference(); - } + Version(size_t vid = 0) + : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0), + m_pending_buffer_head(-1) {} - ~Version() { - if (m_structure) { - m_structure->release_reference(); - } + Version(size_t number, std::unique_ptr structure, BufferType *buff, + size_t head) + : m_buffer(buff), m_structure(std::move(structure)), m_id(number), + m_buffer_head(head), m_pending_buffer_head(-1) {} - if (m_structure->get_reference_count() == 0) { - delete m_structure; - } - } + ~Version() = default; /* * Versions are *not* copyable or movable. Only one can exist, and all users @@ -54,13 +45,26 @@ public: Version &operator=(const Version &) = delete; Version &operator=(Version &&) = delete; - size_t get_epoch_number() const { return m_epoch_number; } + size_t get_id() const { return m_id; } + + void set_id(size_t id) { m_id = id;} - const Structure *get_structure() const { return m_structure.get(); } + const StructureType *get_structure() const { return m_structure.get(); } - Structure *get_mutable_structure() { return m_structure.get(); } + StructureType *get_mutable_structure() { return m_structure.get(); } - BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); } + bool set_structure(std::unique_ptr new_struct) { + if (m_structure) { + return false; + } + + m_structure = std::move(new_struct); + return true; + } + + BufferViewType get_buffer() const { + return m_buffer->get_buffer_view(m_buffer_head); + } /* * Returns a new Version object that is a copy of this one. The new object @@ -69,54 +73,29 @@ public: * provided argument. */ Version *clone(size_t number) { - std::unique_lock m_buffer_lock; - auto epoch = new Version(number); - epoch->m_buffer = m_buffer; - epoch->m_buffer_head = m_buffer_head; + auto version = new Version(number); + version->m_buffer = m_buffer; + version->m_buffer_head = m_buffer_head; if (m_structure) { - epoch->m_structure = m_structure->copy(); - /* the copy routine returns a structure with 0 references */ - epoch->m_structure->take_reference(); + version->m_structure = std::unique_ptr(m_structure->copy()); } - return epoch; + return version; } - /* - * Check if a merge can be started from this Version. At present, without - * concurrent merging, this simply checks if there is currently a scheduled - * merge based on this Version. If there is, returns false. If there isn't, - * return true and set a flag indicating that there is an active merge. - */ - bool prepare_reconstruction() { - auto old = m_active_merge.load(); - if (old) { - return false; - } - - // FIXME: this needs cleaned up - while (!m_active_merge.compare_exchange_strong(old, true)) { - old = m_active_merge.load(); - if (old) { - return false; - } - } - - return true; + void set_next_buffer_head(size_t new_head) { + m_pending_buffer_head = new_head; } - bool advance_buffer_head(size_t head) { - m_buffer_head = head; + bool advance_buffer_head() { + m_buffer_head = m_pending_buffer_head; return m_buffer->advance_head(m_buffer_head); } private: - Buffer *m_buffer; - std::unique_ptr m_structure; - - std::mutex m_buffer_lock; - std::atomic m_active_merge; + BufferType *m_buffer; + std::unique_ptr m_structure; /* * The number of currently active jobs @@ -124,7 +103,8 @@ private: * epoch. An epoch can only be retired * when this number is 0. */ - size_t m_epoch_number; + size_t m_id; size_t m_buffer_head; + ssize_t m_pending_buffer_head; }; } // namespace de diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index c304f1c..62c27f5 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -51,7 +51,6 @@ public: new_struct->m_levels.push_back(m_levels[i]->clone()); } - new_struct->m_refcnt = 0; return new_struct; } @@ -156,8 +155,7 @@ public: return cnt; } - inline void perform_reconstruction(ReconstructionTask task, - BuffView *bv=nullptr) { + inline void perform_reconstruction(ReconstructionTask task) { /* perform the reconstruction itself */ std::vector shards; for (ShardID shid : task.sources) { @@ -172,10 +170,6 @@ public: shards.push_back(m_levels[shid.level_idx]->get_shard(i)); } } - } else if (shid == buffer_shid) { - assert(bv); - ShardType *buffer_shard = new ShardType(std::move(bv)); - shards.push_back(buffer_shard); } else { shards.push_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx)); } diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 9c1e624..a751a29 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -18,10 +18,10 @@ namespace de { template QueryType, DeletePolicy D, SchedulerInterface SchedType> class DEConfiguration { + public: DEConfiguration(std::unique_ptr> recon_policy) - : m_recon_policy(recon_policy) {} + : m_recon_policy(std::move(recon_policy)) {} - public: std::unique_ptr> m_recon_policy; /* buffer parameters */ -- cgit v1.2.3