diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-31 18:41:17 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-31 18:41:17 -0500 |
| commit | f3b7428cfa7f9364c5a8bc85107db3a7cccd53bc (patch) | |
| tree | c6025935175e8c63e5dc1a116f7e605ca74885ce /include/framework | |
| parent | 8fbcfda7270ef266f29f36b8eadcffaec2123612 (diff) | |
| download | dynamic-extension-f3b7428cfa7f9364c5a8bc85107db3a7cccd53bc.tar.gz | |
Adjusted epoch transition methodology
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 215 | ||||
| -rw-r--r-- | include/framework/scheduling/Epoch.h | 55 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 2 |
3 files changed, 120 insertions, 152 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index cc226d2..0992e14 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -43,6 +43,10 @@ class DynamicExtension { static constexpr size_t QUERY = 1; static constexpr size_t RECONSTRUCTION = 2; + struct epoch_ptr { + _Epoch *epoch; + size_t refcnt; + }; public: DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0, @@ -53,12 +57,14 @@ public: , m_buffer(new Buffer(buffer_lwm, buffer_hwm)) , m_core_cnt(thread_cnt) , m_next_core(0) + , m_epoch_cnt(0) { auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); - auto epoch = new _Epoch(0, vers, m_buffer, 0); + m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); + m_previous_epoch.store({nullptr, 0}); + m_next_epoch.store({nullptr, 0}); m_versions.insert(vers); - m_epochs.insert({0, epoch}); } ~DynamicExtension() { @@ -66,16 +72,13 @@ public: /* let any in-flight epoch transition finish */ await_next_epoch(); - /* deactivate the active epoch */ - get_active_epoch()->set_inactive(); - /* shutdown the scheduler */ m_sched.shutdown(); /* delete all held resources */ - for (auto e : m_epochs) { - delete e.second; - } + delete m_next_epoch.load().epoch; + delete m_current_epoch.load().epoch; + delete m_previous_epoch.load().epoch; delete m_buffer; @@ -123,41 +126,41 @@ public: } size_t get_record_count() { - auto epoch = get_active_epoch_protected(); + auto epoch = get_active_epoch(); auto t = epoch->get_buffer().get_record_count() + epoch->get_structure()->get_record_count(); - epoch->end_job(); + end_job(epoch); return t; } size_t get_tombstone_count() { - auto epoch = get_active_epoch_protected(); + auto epoch = get_active_epoch(); auto t = epoch->get_buffer().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); - epoch->end_job(); + end_job(epoch); return t; } size_t get_height() { - auto epoch = get_active_epoch_protected(); + auto epoch = get_active_epoch(); auto t = epoch->get_structure()->get_height(); - epoch->end_job(); + end_job(epoch); return t; } size_t get_memory_usage() { - auto epoch = get_active_epoch_protected(); + auto epoch = get_active_epoch(); auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage(); - epoch->end_job(); + end_job(epoch); return t; } size_t get_aux_memory_usage() { - auto epoch = get_active_epoch_protected(); + auto epoch = get_active_epoch(); auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); - epoch->end_job(); + end_job(epoch); return t; } @@ -171,7 +174,7 @@ public: await_next_epoch(); } - auto epoch = get_active_epoch_protected(); + auto epoch = get_active_epoch(); auto vers = epoch->get_structure(); std::vector<Shard *> shards; @@ -203,7 +206,7 @@ public: delete shard; } - epoch->end_job(); + end_job(epoch); return flattened; } @@ -212,12 +215,10 @@ public: * the newest one to become available. Otherwise, returns immediately. */ void await_next_epoch() { - while (m_current_epoch.load() != m_newest_epoch.load()) { + while (m_next_epoch.load().epoch != nullptr) { std::unique_lock<std::mutex> lk(m_epoch_cv_lk); m_epoch_cv.wait(lk); } - - return; } /* @@ -226,9 +227,9 @@ public: * tombstone proportion invariant. */ bool validate_tombstone_proportion() { - auto epoch = get_active_epoch_protected(); + auto epoch = get_active_epoch(); auto t = epoch->get_structure()->validate_tombstone_proportion(); - epoch->end_job(); + end_job(epoch); return t; } @@ -247,15 +248,14 @@ private: alignas(64) std::atomic<bool> m_reconstruction_scheduled; - std::atomic<size_t> m_current_epoch; - std::atomic<size_t> m_newest_epoch; - std::unordered_map<size_t, _Epoch *> m_epochs; + std::atomic<epoch_ptr> m_next_epoch; + std::atomic<epoch_ptr> m_current_epoch; + std::atomic<epoch_ptr> m_previous_epoch; std::condition_variable m_epoch_cv; std::mutex m_epoch_cv_lk; - std::mutex m_epoch_transition_lk; - std::shared_mutex m_epoch_retire_lk; + std::atomic<size_t> m_epoch_cnt; size_t m_scale_factor; double m_max_delete_prop; @@ -279,12 +279,6 @@ private: auto wait = args->result.get_future(); - /* - * the reconstruction process calls end_job(), - * so we must start one before calling it - */ - epoch->start_job(); - m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); /* wait for compaction completion */ @@ -296,39 +290,38 @@ private: } _Epoch *get_active_epoch() { - return m_epochs[m_current_epoch.load()]; - } + epoch_ptr old, new_ptr; - _Epoch *get_active_epoch_protected() { - m_epoch_retire_lk.lock_shared(); - m_struct_lock.lock(); - auto cur_epoch = m_current_epoch.load(); - m_epochs[cur_epoch]->start_job(); - m_struct_lock.unlock(); - m_epoch_retire_lk.unlock_shared(); + do { + if (m_current_epoch.load().epoch == nullptr) { + old = m_previous_epoch; + new_ptr = {old.epoch, old.refcnt+1}; + if (old.epoch != nullptr && m_previous_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } else { + old = m_current_epoch; + new_ptr = {old.epoch, old.refcnt+1}; + if (old.epoch != nullptr && m_current_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } + } while (true); - return m_epochs[cur_epoch]; + return new_ptr.epoch; } void advance_epoch(size_t buffer_head) { - m_epoch_transition_lk.lock(); - - size_t new_epoch_num = m_newest_epoch.load(); - size_t old_epoch_num = m_current_epoch.load(); - assert(new_epoch_num != old_epoch_num); + retire_epoch(m_previous_epoch.load().epoch); - _Epoch *new_epoch = m_epochs[new_epoch_num]; - _Epoch *old_epoch = m_epochs[old_epoch_num]; + epoch_ptr tmp = {nullptr, 0}; + epoch_ptr cur; + do { + cur = m_current_epoch; + } while(!m_current_epoch.compare_exchange_strong(cur, tmp)); - /* - * Verify the tombstone invariant within the epoch's structure, this - * may require scheduling additional reconstructions. - * - * FIXME: having this inside the lock is going to TANK - * insertion performance. - */ - enforce_delete_invariant(new_epoch); + m_previous_epoch.store(cur); // FIXME: this may currently block because there isn't any // query preemption yet. At this point, we'd need to either @@ -336,20 +329,19 @@ private: // 2) kill all queries on the old_head // 3) somehow migrate all queries on the old_head to the new // version - while (!new_epoch->advance_buffer_head(buffer_head)) { + while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) { _mm_pause(); } - m_current_epoch.fetch_add(1); - old_epoch->set_inactive(); - m_epoch_transition_lk.unlock(); + + m_current_epoch.store(m_next_epoch); + m_next_epoch.store({nullptr, 0}); + /* notify any blocking threads that the new epoch is available */ m_epoch_cv_lk.lock(); m_epoch_cv.notify_all(); m_epoch_cv_lk.unlock(); - - retire_epoch(old_epoch); } /* @@ -363,14 +355,20 @@ private: * is violated, it is possible that this code will clone a retired * epoch. */ - m_newest_epoch.fetch_add(1); - auto new_epoch = get_active_epoch()->clone(m_newest_epoch.load()); + assert(m_next_epoch.load().epoch == nullptr); + auto current_epoch = get_active_epoch(); + + m_epoch_cnt.fetch_add(1); + m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0}); + + end_job(current_epoch); + std::unique_lock<std::mutex> m_struct_lock; - m_versions.insert(new_epoch->get_structure()); - m_epochs.insert({m_newest_epoch.load(), new_epoch}); + m_versions.insert(m_next_epoch.load().epoch->get_structure()); m_struct_lock.release(); - return new_epoch; + + return m_next_epoch.load().epoch; } void retire_epoch(_Epoch *epoch) { @@ -384,28 +382,25 @@ private: * proceed. */ + if (epoch == nullptr) { + return; + } + + epoch_ptr old, new_ptr; + new_ptr = {nullptr, 0}; do { - if (epoch->retirable()) { - m_epoch_retire_lk.lock(); - if (!epoch->retirable()) { - m_epoch_retire_lk.unlock(); - continue; - } + old = m_previous_epoch.load(); + + if (old.epoch == epoch && old.refcnt == 0 && + m_previous_epoch.compare_exchange_strong(old, new_ptr)) { break; } - } while (true); + usleep(1); + } while(true); - /* remove epoch from the framework's map */ - m_epochs.erase(epoch->get_epoch_number()); + //fprintf(stderr, "Epoch %ld retired [%p]\n", epoch->get_epoch_number(), epoch); - /* - * The epoch's destructor will handle releasing - * all the references it holds - */ delete epoch; - m_epoch_retire_lk.unlock(); - - /* NOTE: the BufferView mechanism handles freeing unused buffer space */ /* * Following the epoch's destruction, any buffers @@ -453,7 +448,6 @@ private: vers->flush_buffer(std::move(buffer_view)); } - args->epoch->end_job(); args->result.set_value(true); /* @@ -473,8 +467,17 @@ private: static void async_query(void *arguments) { QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments; - auto buffer = args->epoch->get_buffer(); - auto vers = args->epoch->get_structure(); + auto epoch = ((DynamicExtension *) args->extension)->get_active_epoch(); + + auto ptr1 = ((DynamicExtension *) args->extension)->m_previous_epoch.load().epoch; + auto ptr2 = ((DynamicExtension *) args->extension)->m_current_epoch.load().epoch; + auto ptr3 = ((DynamicExtension *) args->extension)->m_next_epoch.load().epoch; + + //fprintf(stderr, "(%ld, %p)\t%p\t%p\t%p\n", epoch->get_epoch_number(), epoch, ptr1, ptr2, ptr3); + + + auto buffer = epoch->get_buffer(); + auto vers = epoch->get_structure(); void *parms = args->query_parms; /* Get the buffer query states */ @@ -509,7 +512,7 @@ private: auto result = Q::merge(query_results, parms); args->result_set.set_value(std::move(result)); - args->epoch->end_job(); + ((DynamicExtension *) args->extension)->end_job(epoch); Q::delete_buffer_query_state(buffer_state); for (size_t i=0; i<states.size(); i++) { @@ -525,7 +528,6 @@ private: * the reconstruction process calls end_job(), * so we must start one before calling it */ - epoch->start_job(); ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>(); args->epoch = epoch; @@ -538,10 +540,8 @@ private: } std::future<std::vector<R>> schedule_query(void *query_parms) { - auto epoch = get_active_epoch_protected(); - QueryArgs<R, S, Q, L> *args = new QueryArgs<R, S, Q, L>(); - args->epoch = epoch; + args->extension = this; args->query_parms = query_parms; auto result = args->result_set.get_future(); @@ -643,6 +643,29 @@ private: ::sched_setaffinity(0, sizeof(mask), &mask); } + + void end_job(_Epoch *epoch) { + epoch_ptr old, new_ptr; + + do { + if (m_previous_epoch.load().epoch == epoch) { + old = m_previous_epoch; + assert(old.refcnt > 0); + new_ptr = {old.epoch, old.refcnt - 1}; + if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } else { + old = m_current_epoch; + assert(old.refcnt > 0); + new_ptr = {old.epoch, old.refcnt - 1}; + if (m_current_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } + } while (true); + } + }; } diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 45ee17d..7b533b6 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -29,8 +29,6 @@ public: : m_buffer(nullptr) , m_structure(nullptr) , m_active_merge(false) - , m_active_jobs(0) - , m_active(true) , m_epoch_number(number) , m_buffer_head(0) {} @@ -38,9 +36,7 @@ public: Epoch(size_t number, Structure *structure, Buffer *buff, size_t head) : m_buffer(buff) , m_structure(structure) - , m_active_jobs(0) , m_active_merge(false) - , m_active(true) , m_epoch_number(number) , m_buffer_head(head) { @@ -48,8 +44,6 @@ public: } ~Epoch() { - assert(m_active_jobs.load() == 0); - /* FIXME: this is needed to keep the destructor from sometimes locking * up here. But there *shouldn't* be any threads waiting on this signal * at object destruction, so something else is going on here that needs @@ -71,24 +65,6 @@ public: Epoch &operator=(const Epoch&) = delete; Epoch &operator=(Epoch&&) = delete; - void start_job() { - m_active_jobs.fetch_add(1); - } - - void end_job() { - assert(m_active_jobs.load() > 0); - m_active_jobs.fetch_add(-1); - - if (m_active_jobs.load() == 0) { - std::unique_lock<std::mutex> lk(m_cv_lock); - m_active_cv.notify_all(); - } - } - - size_t get_active_job_num() { - return m_active_jobs.load(); - } - size_t get_epoch_number() { return m_epoch_number; } @@ -145,32 +121,6 @@ public: return true; } - void set_inactive() { - m_active = false; - } - - /* - * - */ - bool retirable() { - /* if epoch is currently active, then it cannot be retired */ - if (m_active) { - return false; - } - - /* - * if the epoch has active jobs but is not itself active, - * wait for them to finish and return true. If there are - * not active jobs, return true immediately - */ - std::unique_lock<std::mutex> lk(m_cv_lock); - while (m_active_jobs.load() > 0) { - m_active_cv.wait(lk); - } - - return true; - } - bool advance_buffer_head(size_t head) { m_buffer_head = head; return m_buffer->advance_head(m_buffer_head); @@ -180,9 +130,6 @@ private: Structure *m_structure; Buffer *m_buffer; - std::condition_variable m_active_cv; - std::mutex m_cv_lock; - std::mutex m_buffer_lock; std::atomic<bool> m_active_merge; @@ -192,8 +139,6 @@ private: * epoch. An epoch can only be retired * when this number is 0. */ - std::atomic<size_t> m_active_jobs; - bool m_active; size_t m_epoch_number; size_t m_buffer_head; }; diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index b14b229..6f6b913 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -29,9 +29,9 @@ struct ReconstructionArgs { template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L> struct QueryArgs { - Epoch<R, S, Q, L> *epoch; std::promise<std::vector<R>> result_set; void *query_parms; + void *extension; }; typedef std::function<void(void*)> Job; |