diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-03-03 13:41:19 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-03-03 13:41:19 -0500 |
| commit | 2ded45f5a20f38fdfd9f348c446c38dc713a5591 (patch) | |
| tree | 746fb09b49ee4c00fc3e4760d899d60d8d8dcce0 /include/framework | |
| parent | d116b94389538aa8e0e7354fae77693b980de4f0 (diff) | |
| download | dynamic-extension-2ded45f5a20f38fdfd9f348c446c38dc713a5591.tar.gz | |
Fixed a few concurrency bugs
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 204 | ||||
| -rw-r--r-- | include/framework/reconstruction/BackgroundTieringPolicy.h | 2 | ||||
| -rw-r--r-- | include/framework/scheduling/FIFOScheduler.h | 21 | ||||
| -rw-r--r-- | include/framework/scheduling/LockManager.h | 35 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 12 | ||||
| -rw-r--r-- | include/framework/scheduling/Version.h | 44 | ||||
| -rw-r--r-- | include/framework/structure/BufferView.h | 17 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 160 | ||||
| -rw-r--r-- | include/framework/util/Configuration.h | 4 |
9 files changed, 303 insertions, 196 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 8cef4a1..762029e 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -1,7 +1,7 @@ /* * include/framework/DynamicExtension.h * - * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu> * Dong Xie <dongx@psu.edu> * * Distributed under the Modified BSD License. @@ -161,7 +161,6 @@ public: static_assert(std::same_as<SchedType, SerialScheduler>, "Tagging is only supported in single-threaded operation"); - auto view = m_buffer->get_buffer_view(); auto version = get_active_version(); if (version->get_mutable_structure()->tagged_delete(rec)) { return 1; @@ -172,7 +171,7 @@ public: * probably has the lowest probability of having the record, * so we'll check it last. */ - return view.delete_record(rec); + return version->get_buffer().delete_record(rec); } /* @@ -256,9 +255,8 @@ public: * ShardType::get_memory_usage) and the buffer by the index. */ size_t get_memory_usage() { - auto version = get_active_version(); auto t = m_buffer->get_memory_usage() + - version->get_structure()->get_memory_usage(); + get_active_version()->get_structure()->get_memory_usage(); return t; } @@ -288,8 +286,7 @@ public: * the index. Ownership of this object is transfered to the * caller. */ - // FIXME: switch this over to std::unique_ptr - ShardType * + std::unique_ptr<ShardType> create_static_structure(bool await_reconstruction_completion = false) { if (await_reconstruction_completion) { await_version(); @@ -322,7 +319,7 @@ public: } } - ShardType *flattened = new ShardType(shards); + auto flattened = std::make_unique<ShardType>(shards); for (auto shard : shards) { delete shard; @@ -348,7 +345,7 @@ public: * of the counter */ if (vid == INVALID_VERSION) { - vid = m_version_counter.load() - 1; + vid = m_version_counter.load(); } /* versions signal on m_version_advance_cv when they activate */ @@ -384,7 +381,11 @@ public: * stdout. Each level is on its own line, and each shard is represented. */ void print_structure() { - get_active_version()->get_structure()->print_structure(); + auto ver = get_active_version(); + auto bv = ver->get_buffer(); + + fprintf(stdout, "[B]:\t(%ld)\n", bv.get_record_count()); + ver->get_structure()->print_structure(); } private: @@ -397,15 +398,15 @@ private: std::atomic<int> m_next_core; /* versioning + concurrency variables */ - std::atomic<size_t> m_version_counter; - std::atomic<std::shared_ptr<VersionType>> m_active_version; + alignas(64) std::atomic<size_t> m_version_counter; + alignas(64) std::atomic<std::shared_ptr<VersionType>> m_active_version; - std::condition_variable m_version_advance_cv; - std::mutex m_version_advance_mtx; + alignas(64) std::condition_variable m_version_advance_cv; + alignas(64) std::mutex m_version_advance_mtx; - LockManager m_lock_mngr; + alignas(64) LockManager m_lock_mngr; - std::atomic<size_t> m_preempt_version; + alignas(64) std::atomic<size_t> m_preempt_version; alignas(64) std::atomic<bool> m_scheduling_reconstruction; @@ -460,7 +461,9 @@ private: auto extension = (DynamicExtension *)args->extension; extension->SetThreadAffinity(); - + // static std::atomic<size_t> cnt = 0; + // size_t recon_id = cnt.fetch_add(1); + size_t new_head = 0; std::vector<reconstruction_results<ShardType>> reconstructions; @@ -476,21 +479,24 @@ private: * this code will be bypassed in that case. */ if (args->priority == ReconstructionPriority::FLUSH) { - // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id()); + // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id); + // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n", + // args->version->get_id(), recon_id); /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); assert(buffview.get_tail() != buffview.get_head()); new_head = buffview.get_tail(); + // fprintf(stderr, "\t[I] Current Buffer Head:\t%ld (%ld)\n", + // buffview.get_head(), recon_id); + reconstruction_results<ShardType> flush_recon; flush_recon.target_level = 0; flush_recon.new_shard = std::make_shared<ShardType>(std::move(buffview)); reconstructions.push_back(flush_recon); - // fprintf(stderr, "[I] Buffer Head:\t%ld\n", new_head); - /* advance the buffer head for a flush */ bool success = false; size_t failure_cnt = 0; @@ -499,24 +505,39 @@ private: if (!success) { failure_cnt++; usleep(1); - // fprintf(stderr, "[I] Buffer head advance blocked on %ld\n", args->version->get_id()); + // fprintf(stderr, "\t[I] Buffer head advance blocked on %ld (%ld)\n", + // args->version->get_id(), recon_id); - if (failure_cnt >= extension->m_config.buffer_flush_query_preemption_trigger) { + if (failure_cnt >= + extension->m_config.buffer_flush_query_preemption_trigger) { extension->preempt_queries(); + + if (failure_cnt > 500000) { + // fprintf(stderr, + // "[C] Critical failure. Hung on version: %ld (%ld)\n", + // extension->m_buffer->debug_get_old_head(), recon_id); + } } } } + // fprintf(stderr, "\t[I] Buffer head advanced to:\t%ld (%ld)\n", + // new_head, recon_id); } else { - // fprintf(stderr, "[I] Running background reconstruction\n"); + // fprintf(stderr, "[I] Running background reconstruction (%ld)\n", + // recon_id); } /* perform all of the reconstructions */ auto structure = args->version->get_structure(); assert(structure); - + + // fprintf(stderr, "\t[I] Pre-reconstruction L0 Size\t%ld (%ld)\n", + // structure->get_level_vector()[0]->get_shard_count(), recon_id); + for (size_t i = 0; i < args->tasks.size(); i++) { - reconstructions.emplace_back(structure->perform_reconstruction(args->tasks[i])); + reconstructions.emplace_back( + structure->perform_reconstruction(args->tasks[i])); } /* @@ -524,51 +545,90 @@ private: * reconstruction isn't a flush), generate one. */ if (args->version->get_id() == INVALID_VERSION) { - args->version->set_id(extension->m_version_counter.fetch_add(1)); - // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id()); + assert(args->priority == ReconstructionPriority::MAINT); + args->version->set_id(extension->m_version_counter.fetch_add(1) + 1); + // fprintf(stderr, "\t[I] Reconstruction version assigned %ld (%ld)\n", + // args->version->get_id(), recon_id); + } else { + assert(args->priority == ReconstructionPriority::FLUSH); } - + /* wait for our opportunity to install the updates */ extension->await_version(args->version->get_id() - 1); - /* get a fresh copy of the structure and apply our updates */ - args->version->set_structure(std::move(std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy()))); + // size_t old_reccnt = args->version->get_structure()->get_record_count(); + + /* + * this version *should* have an ID one less than the version we are + * currently constructing, and should be fully stable (i.e., only the + * buffer tail can change) + */ + auto active_version = extension->get_active_version(); + assert(active_version->get_id() == args->version->get_id() - 1); + + /* get a fresh copy of the structure from the current version */ + args->version->set_structure(std::move(std::unique_ptr<StructureType>( + active_version->get_structure()->copy()))); + + // size_t cur_reccnt = args->version->get_structure()->get_record_count(); + /* apply our updates to the copied structure (adding/removing shards) */ for (auto recon : reconstructions) { - auto grow = args->version->get_mutable_structure()->append_shard(recon.new_shard, args->version->get_id(), recon.target_level); + auto grow = args->version->get_mutable_structure()->append_shard( + recon.new_shard, args->version->get_id(), recon.target_level); args->version->get_mutable_structure()->delete_shards(recon.source_shards); if (grow) { extension->m_lock_mngr.add_lock(); } } - /* grab the latest buffer head */ + // size_t new_reccnt = args->version->get_structure()->get_record_count(); + + // fprintf(stderr, "\t[I] Post-reconstruction L0 Size\t%ld (%ld)\n", + // args->version->get_structure()->get_level_vector()[0]->get_shard_count(), + // recon_id); + + /* for maintenance reconstructions, advance the buffer head to match the + * currently active version */ if (args->priority == ReconstructionPriority::MAINT) { - args->version->set_head(extension->get_active_version()->get_head()); + args->version->set_buffer(extension->m_buffer.get(), + active_version->get_head()); + // fprintf(stderr, "\t[I] Buffer head set to %ld (%ld)\n", + // active_version->get_head(), recon_id); + // if (new_reccnt != cur_reccnt) { + // fprintf(stderr, "ERROR: invalid reccnt (%ld)\n", recon_id); + // } } + // fprintf(stderr, "\t[I] Record Counts: %ld %ld %ld (%ld)\n", old_reccnt, + // cur_reccnt, new_reccnt, recon_id); + /* advance the index to the newly finished version */ extension->install_new_version(args->version, args->initial_version); /* maint reconstructions can now safely release their locks */ if (args->priority == ReconstructionPriority::MAINT) { std::set<size_t> locked_levels; - for (size_t i=0; i<args->tasks.size(); i++) { + for (size_t i = 0; i < args->tasks.size(); i++) { for (auto source : args->tasks[i].sources) { locked_levels.insert(source.level_idx); } } for (auto level : locked_levels) { - // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, args->version->get_id()); - extension->m_lock_mngr.release_lock(level); + // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, recon_id); + extension->m_lock_mngr.release_lock(level, args->version->get_id()); } } - + if (args->priority == ReconstructionPriority::FLUSH) { + // fprintf(stderr, "\t[I] releasing lock on buffer (%ld)\n", recon_id); extension->m_lock_mngr.release_buffer_lock(); } + // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n", + // args->version->get_id(), recon_id); + /* manually delete the argument object */ delete args; } @@ -596,6 +656,19 @@ private: /* execute the local/buffer queries and combine the results into output */ QueryResult output; do { + /* + * for query preemption--check if the query should be restarted + * to prevent blocking buffer flushes for too long + */ + if (args->extension->restart_query(args, version->get_id())) { + /* clean up memory allocated for temporary query objects */ + delete buffer_query; + for (size_t i = 0; i < local_queries.size(); i++) { + delete local_queries[i]; + } + return; + } + std::vector<LocalResult> query_results(shards.size() + 1); for (size_t i = 0; i < query_results.size(); i++) { if (i == 0) { /* execute buffer query */ @@ -610,19 +683,6 @@ private: if (query_results[i].size() > 0) break; } - - /* - * for query preemption--check if the query should be restarted - * to prevent blocking buffer flushes for too long - */ - if (args->extension->restart_query(args, version->get_id())) { - /* clean up memory allocated for temporary query objects */ - delete buffer_query; - for (size_t i = 0; i < local_queries.size(); i++) { - delete local_queries[i]; - } - return; - } } /* @@ -656,7 +716,7 @@ private: * early to minimize activation blocking. */ version_ptr create_version_flush(std::unique_ptr<StructureType> structure) { - size_t version_id = m_version_counter.fetch_add(1); + size_t version_id = m_version_counter.fetch_add(1) + 1; // fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id); auto active_version = get_active_version(); std::shared_ptr<VersionType> new_version = @@ -674,17 +734,16 @@ private: */ version_ptr create_version_maint(std::unique_ptr<StructureType> structure) { auto active_version = get_active_version(); - version_ptr new_version = - std::make_shared<VersionType>(INVALID_VERSION, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head()); + version_ptr new_version = std::make_shared<VersionType>( + INVALID_VERSION, std::move(structure), nullptr, 0); return new_version; } void install_new_version(version_ptr new_version, size_t old_active_version_id) { - assert(new_version->get_structure()); - assert(new_version->get_id() != INVALID_VERSION); + assert(new_version->valid()); - // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); + // fprintf(stderr, "\t[I] Installing version %ld\n", new_version->get_id()); /* wait until our turn to install the new version */ await_version(new_version->get_id() - 1); @@ -698,11 +757,7 @@ private: m_active_version.store(new_version); m_version_advance_cv.notify_all(); - // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id()); - } - - StructureType *create_scratch_structure() { - return get_active_version()->get_structure()->copy(); + // fprintf(stderr, "\t[I] Installed version %ld\n", new_version->get_id()); } void begin_reconstruction_scheduling() { @@ -727,6 +782,20 @@ private: return; } + /* + * Double check that we actually need to flush. I was seeing some + * flushes running on empty buffers--somehow it seems like a new + * flush was getting scheduled immediately after another one finished, + * when it wasn't necessary. This should prevent that from happening. + * + * A bit of a kludge, but it *should* work + */ + if (!m_buffer->is_at_low_watermark()) { + m_lock_mngr.release_buffer_lock(); + end_reconstruction_scheduling(); + return; + } + auto active_version = m_active_version.load(); auto *args = new ReconstructionArgs<ShardType, QueryType>(); @@ -768,10 +837,6 @@ private: auto active_version = m_active_version.load(); auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr); - if (reconstructions.size() == 0) { - // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n"); - } - for (auto &recon : reconstructions) { /* * NOTE: args is deleted by the reconstruction job, so shouldn't be @@ -783,7 +848,7 @@ private: args->extension = this; args->priority = ReconstructionPriority::MAINT; args->initial_version = active_version->get_id(); - m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, + m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), args, RECONSTRUCTION); } @@ -806,11 +871,8 @@ private: } int internal_append(const RecordType &rec, bool ts) { - if (m_buffer->is_at_low_watermark()) { - auto old = false; - if (m_scheduling_reconstruction.compare_exchange_strong(old, true)) { - schedule_flush(); - } + if (m_buffer->is_at_low_watermark() && !m_lock_mngr.is_buffer_locked()) { + schedule_flush(); } /* this will fail if the HWM is reached and return 0 */ diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h index 9a9ebaa..ab19e24 100644 --- a/include/framework/reconstruction/BackgroundTieringPolicy.h +++ b/include/framework/reconstruction/BackgroundTieringPolicy.h @@ -44,7 +44,7 @@ public: } for (level_index i = target_level; i > source_level; i--) { - if (lock_mngr.take_lock(i-1)) { + if (lock_mngr.take_lock(i-1, version->get_id())) { ReconstructionVector recon; size_t total_reccnt = levels[i - 1]->get_record_count(); std::vector<ShardID> shards; diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 8a4cd8d..16fe111 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -1,7 +1,7 @@ /* * include/framework/scheduling/FIFOScheduler.h * - * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu> * * Distributed under the Modified BSD License. * @@ -9,11 +9,6 @@ * determine which jobs to run next. If more jobs are scheduled than there * are available threads, the excess will stall until a thread becomes * available and then run in the order they were received by the scheduler. - * - * TODO: We need to set up a custom threadpool based on jthreads to support - * thread preemption for a later phase of this project. That will allow us - * to avoid blocking epoch transitions on long-running queries, or to pause - * reconstructions on demand. */ #pragma once @@ -40,7 +35,6 @@ public: m_thrd_cnt((thread_cnt) ? thread_cnt : DEFAULT_MAX_THREADS), m_used_memory(0), m_used_thrds(0), m_shutdown(false) { m_sched_thrd = std::thread(&FIFOScheduler::run, this); - m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this); m_thrd_pool.resize(m_thrd_cnt); } @@ -50,7 +44,6 @@ public: } m_sched_thrd.join(); - m_sched_wakeup_thrd.join(); m_flush_thread.join(); } @@ -60,13 +53,13 @@ public: size_t ts = m_counter.fetch_add(1); if (type == 3) { - do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock)); + do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock, &m_cv)); return; } std::unique_lock<std::mutex> lk(m_cv_lock); m_stats.job_queued(ts, type, size); - m_task_queue.push(Task(size, ts, job, args, type, &m_stats)); + m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv)); m_cv.notify_all(); } @@ -95,7 +88,6 @@ private: std::thread m_flush_thread; std::thread m_sched_thrd; - std::thread m_sched_wakeup_thrd; ctpl::thread_pool m_thrd_pool; std::atomic<size_t> m_used_memory; @@ -105,13 +97,6 @@ private: SchedulerStatistics m_stats; - void periodic_wakeup() { - do { - std::this_thread::sleep_for(10us); - m_cv.notify_all(); - } while (!m_shutdown.load()); - } - void schedule_next() { auto lk = std::unique_lock<std::mutex>(m_queue_lock); assert(m_task_queue.size() > 0); diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h index fcc79d1..275c5ae 100644 --- a/include/framework/scheduling/LockManager.h +++ b/include/framework/scheduling/LockManager.h @@ -5,6 +5,7 @@ #pragma once #include <deque> #include <atomic> +#include <cassert> namespace de { class LockManager { @@ -13,6 +14,8 @@ public: for (size_t i=0; i < levels; i++) { m_lks.emplace_back(false); } + + m_last_unlocked_version = 0; } ~LockManager() = default; @@ -21,25 +24,38 @@ public: m_lks.emplace_back(false); } - void release_lock(size_t idx) { + void release_lock(size_t idx, size_t version) { if (idx < m_lks.size()) { - m_lks[idx].store(false); + assert(m_lks.at(idx).load() == true); + m_lks.at(idx).store(false); + + while (m_last_unlocked_version.load() < version) { + auto tmp = m_last_unlocked_version.load(); + m_last_unlocked_version.compare_exchange_strong(tmp, version); + } } } - bool is_locked(size_t idx) { + bool is_locked(size_t idx, size_t version) { if (idx < m_lks.size()) { - return m_lks[idx].load(); + return m_lks.at(idx).load() && m_last_unlocked_version <= version; } return false; } - bool take_lock(size_t idx) { + bool take_lock(size_t idx, size_t version) { if (idx < m_lks.size()) { - bool old = m_lks[idx].load(); + bool old = m_lks.at(idx).load(); if (!old) { - return m_lks[idx].compare_exchange_strong(old, true); + auto result = m_lks.at(idx).compare_exchange_strong(old, true); + + if (m_last_unlocked_version.load() > version) { + m_lks.at(idx).store(false); + return false; + } + + return result; } } @@ -55,6 +71,10 @@ public: return false; } + bool is_buffer_locked() { + return m_buffer_lk.load(); + } + void release_buffer_lock() { m_buffer_lk.store(false); } @@ -62,5 +82,6 @@ public: private: std::deque<std::atomic<bool>> m_lks; std::atomic<bool> m_buffer_lk; + std::atomic<size_t> m_last_unlocked_version; }; } diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 2d68f56..3dbc9f4 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -1,7 +1,7 @@ /* * include/framework/scheduling/Task.h * - * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu> * * Distributed under the Modified BSD License. * @@ -16,6 +16,7 @@ #include <chrono> #include <functional> #include <future> +#include <condition_variable> #include "framework/scheduling/Version.h" #include "framework/scheduling/statistics.h" @@ -49,9 +50,9 @@ typedef std::function<void(void *)> Job; struct Task { Task(size_t size, size_t ts, Job job, void *args, size_t type = 0, - SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr) + SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr, std::condition_variable *cv=nullptr) : m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type), - m_stats(stats), m_lk(lk) {} + m_stats(stats), m_lk(lk), m_cv(cv) {} Job m_job; size_t m_size; @@ -60,6 +61,7 @@ struct Task { size_t m_type; SchedulerStatistics *m_stats; std::mutex *m_lk; + std::condition_variable *m_cv; friend bool operator<(const Task &self, const Task &other) { return self.m_timestamp < other.m_timestamp; @@ -92,6 +94,10 @@ struct Task { if (m_lk) { m_lk->unlock(); } + + if (m_cv) { + m_cv->notify_all(); + } } }; diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index 4cd73ba..be54c84 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -25,16 +25,24 @@ private: typedef BufferView<RecordType> BufferViewType; public: + Version(size_t vid = 0) - : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0), - m_pending_buffer_head(-1) {} + : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0) {} Version(size_t number, std::unique_ptr<StructureType> structure, BufferType *buff, size_t head) : m_buffer(buff), m_structure(std::move(structure)), m_id(number), - m_buffer_head(head), m_pending_buffer_head(-1) {} - - ~Version() = default; + m_buffer_head(head) { + if (m_buffer) { + m_buffer->take_head_reference(m_buffer_head); + } + } + + ~Version() { + if (m_buffer) { + m_buffer->release_head_reference(m_buffer_head); + } + } /* * Versions are *not* copyable or movable. Only one can exist, and all users @@ -72,6 +80,9 @@ public: auto version = new Version(number); version->m_buffer = m_buffer; version->m_buffer_head = m_buffer_head; + if (version->m_buffer) { + version->m_buffer->take_head_reference(m_buffer_head); + } if (m_structure) { version->m_structure = std::unique_ptr(m_structure->copy()); @@ -81,8 +92,15 @@ public: } bool advance_buffer_head(size_t new_head) { - m_buffer_head = new_head; - return m_buffer->advance_head(new_head); + m_buffer->release_head_reference(m_buffer_head); + if (m_buffer->advance_head(new_head)) { + m_buffer_head = new_head; + return true; + } + + /* if we failed to advance, reclaim our reference */ + m_buffer->take_head_reference(m_buffer_head); + return false; } void update_shard_version(size_t version) { @@ -94,9 +112,16 @@ public: } - void set_head(size_t head) { - // fprintf(stderr, "[I] Updating buffer head of %ld to %ld\n", get_id(), head); + void set_buffer(BufferType *buffer, size_t head) { + assert(m_buffer == nullptr); + + m_buffer = buffer; m_buffer_head = head; + m_buffer->take_head_reference(head); + } + + bool valid() { + return (m_buffer) && (m_buffer_head) && (m_structure) && (m_id); } private: @@ -105,6 +130,5 @@ private: size_t m_id; size_t m_buffer_head; - ssize_t m_pending_buffer_head; }; } // namespace de diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index acf1201..a9fb12d 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -1,7 +1,7 @@ /* * include/framework/structure/BufferView.h * - * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu> * * Distributed under the Modified BSD License. * @@ -20,8 +20,6 @@ namespace de { -typedef std::function<void(void)> ReleaseFunction; - template <RecordInterface R> class BufferView { public: BufferView() = default; @@ -35,7 +33,6 @@ public: BufferView(BufferView &&other) : m_data(std::exchange(other.m_data, nullptr)), - m_release(std::move(other.m_release)), m_head(std::exchange(other.m_head, 0)), m_tail(std::exchange(other.m_tail, 0)), m_start(std::exchange(other.m_start, 0)), @@ -48,18 +45,13 @@ public: BufferView &operator=(BufferView &&other) = delete; BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail, - size_t tombstone_cnt, psudb::BloomFilter<R> *filter, - ReleaseFunction release) - : m_data(buffer), m_release(release), m_head(head), m_tail(tail), + size_t tombstone_cnt, psudb::BloomFilter<R> *filter) + : m_data(buffer), m_head(head), m_tail(tail), m_start(m_head % cap), m_stop(m_tail % cap), m_cap(cap), m_approx_ts_cnt(tombstone_cnt), m_tombstone_filter(filter), m_active(true) {} - ~BufferView() { - if (m_active) { - m_release(); - } - } + ~BufferView() = default; bool check_tombstone(const R &rec) { if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) @@ -138,7 +130,6 @@ public: private: Wrapped<R> *m_data; - ReleaseFunction m_release; size_t m_head; size_t m_tail; size_t m_start; diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index e62a495..7357915 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -1,7 +1,7 @@ /* * include/framework/structure/MutableBuffer.h * - * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu> * Dong Xie <dongx@psu.edu> * * Distributed under the Modified BSD License. @@ -109,11 +109,11 @@ public: size_t get_tombstone_count() const { return m_tscnt.load(); } bool delete_record(const R &rec) { - return get_buffer_view().delete_record(rec); + return get_buffer_view(m_head.load().head_idx).delete_record(rec); } bool check_tombstone(const R &rec) { - return get_buffer_view().check_tombstone(rec); + return get_buffer_view(m_head.load().head_idx).check_tombstone(rec); } size_t get_memory_usage() const { return m_cap * sizeof(Wrapped<R>); } @@ -122,20 +122,9 @@ public: return m_tombstone_filter->get_memory_usage(); } - BufferView<R> get_buffer_view(size_t target_head) { - size_t head = get_head(target_head); - auto f = std::bind(release_head_reference, (void *)this, head); - + BufferView<R> get_buffer_view(size_t head) { return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), - m_tombstone_filter, f); - } - - BufferView<R> get_buffer_view() { - size_t head = get_head(m_head.load().head_idx); - auto f = std::bind(release_head_reference, (void *)this, head); - - return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), - m_tombstone_filter, f); + m_tombstone_filter); } /* @@ -159,7 +148,7 @@ public: // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, m_head.load().refcnt); - buffer_head new_hd = {new_head, 0}; + buffer_head new_hd = {new_head, 1}; buffer_head cur_hd; /* replace current head with new head */ @@ -174,32 +163,6 @@ public: return true; } - /* - * FIXME: If target_head does not match *either* the old_head or the - * current_head, this routine will loop infinitely. - */ - size_t get_head(size_t target_head) { - buffer_head cur_hd, new_hd; - bool head_acquired = false; - - - //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx); - do { - if (m_old_head.load().head_idx == target_head) { - cur_hd = m_old_head.load(); - cur_hd.head_idx = target_head; - new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; - head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); - } else if (m_head.load().head_idx == target_head) { - cur_hd = m_head.load(); - cur_hd.head_idx = target_head; - new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; - head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); - } - } while (!head_acquired); - - return new_hd.head_idx; - } void set_low_watermark(size_t lwm) { assert(lwm < m_hwm); @@ -234,8 +197,90 @@ public: return m_cap - (m_tail.load() - m_old_head.load().head_idx); } + size_t debug_get_old_head() const { + return m_old_head.load().head_idx; + } + + size_t debug_get_head() const { + return m_head.load().head_idx; + } + + bool take_head_reference(size_t target_head) { + buffer_head cur_hd, new_hd; + bool head_acquired = false; + + do { + if (m_old_head.load().head_idx == target_head) { + cur_hd = m_old_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else if (m_head.load().head_idx == target_head) { + cur_hd = m_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } while (!head_acquired); + + return head_acquired; + } + + bool release_head_reference(size_t head) { + buffer_head cur_hd, new_hd; + bool head_released = false; + do { + if (m_old_head.load().head_idx == head) { + cur_hd = m_old_head; + + assert(cur_hd.refcnt > 0); + new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; + head_released = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else { + cur_hd = m_head; + + /* it's possible the head got pushed from current to old */ + if (cur_hd.head_idx == head) { + assert(cur_hd.refcnt > 0); + new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; + head_released = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } + _mm_pause(); + } while (!head_released); + + return head_released; + } + private: - int64_t try_advance_tail() { + /* + * FIXME: If target_head does not match *either* the old_head or the + * current_head, this routine will loop infinitely. + */ + size_t get_head(size_t target_head) { + buffer_head cur_hd, new_hd; + bool head_acquired = false; + + + //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx); + do { + if (m_old_head.load().head_idx == target_head) { + cur_hd = m_old_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else if (m_head.load().head_idx == target_head) { + cur_hd = m_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } while (!head_acquired); + + return new_hd.head_idx; + } + + ssize_t try_advance_tail() { size_t old_value = m_tail.load(); /* if full, fail to advance the tail */ @@ -257,33 +302,6 @@ private: size_t to_idx(size_t i, size_t head) { return (head + i) % m_cap; } - static void release_head_reference(void *buff, size_t head) { - MutableBuffer<R> *buffer = (MutableBuffer<R> *)buff; - - buffer_head cur_hd, new_hd; - do { - if (buffer->m_old_head.load().head_idx == head) { - cur_hd = buffer->m_old_head; - if (cur_hd.refcnt == 0) - continue; - new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; - if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { - break; - } - } else { - cur_hd = buffer->m_head; - if (cur_hd.refcnt == 0) - continue; - new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; - - if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { - break; - } - } - _mm_pause(); - } while (true); - } - size_t m_lwm; size_t m_hwm; size_t m_cap; diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index fecb2bf..2b8a7fc 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -27,7 +27,7 @@ class DEConfiguration { /* buffer parameters */ size_t buffer_count = 1; size_t buffer_size = 8000; - size_t buffer_flush_trigger = buffer_size / 4; + size_t buffer_flush_trigger = buffer_size / 2; /* reconstruction triggers */ bool recon_enable_seek_trigger = false; @@ -44,7 +44,7 @@ class DEConfiguration { size_t minimum_query_threads = 4; size_t maximum_memory_usage = 0; /* o for unbounded */ - size_t buffer_flush_query_preemption_trigger = 10; + size_t buffer_flush_query_preemption_trigger = UINT64_MAX; }; |