diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-12 14:10:11 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-12 14:10:11 -0500 |
| commit | aac0bb661af8fae38d3ce08d6078cb4d9dfcb575 (patch) | |
| tree | 347e0ce7f7e15f2610039f02b75d47cedf810cd6 /include/framework/DynamicExtension.h | |
| parent | c4514c2e62a711189cf3c914297885d97fb51a09 (diff) | |
| download | dynamic-extension-aac0bb661af8fae38d3ce08d6078cb4d9dfcb575.tar.gz | |
Initial integration of new buffering scheme into framework
It isn't working right now (lotsa test failures), but we're to the
debugging phase now.
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 252 |
1 files changed, 88 insertions, 164 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index c5c4a1a..c97b390 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -10,29 +10,23 @@ #pragma once #include <atomic> -#include <numeric> #include <cstdio> #include <vector> #include <set> #include <shared_mutex> #include <mutex> +#include "framework/interface/Scheduler.h" +#include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/SerialScheduler.h" + #include "framework/structure/MutableBuffer.h" -#include "framework/structure/InternalLevel.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" #include "framework/interface/Record.h" -#include "framework/interface/Query.h" -#include "framework/interface/Scheduler.h" #include "framework/structure/ExtensionStructure.h" #include "framework/util/Configuration.h" -#include "framework/scheduling/FIFOScheduler.h" -#include "framework/scheduling/SerialScheduler.h" #include "framework/scheduling/Epoch.h" -#include "psu-util/timer.h" -#include "psu-ds/Alias.h" namespace de { @@ -43,22 +37,19 @@ class DynamicExtension { typedef MutableBuffer<R> Buffer; typedef ExtensionStructure<R, S, Q, L> Structure; typedef Epoch<R, S, Q, L> _Epoch; - typedef BufferView<R, Q> BufView; + typedef BufferView<R> BufView; public: - DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop, size_t memory_budget=0, + DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0, size_t thread_cnt=16) : m_scale_factor(scale_factor) - , m_max_delete_prop(max_delete_prop) + , m_max_delete_prop(1) , m_sched(memory_budget, thread_cnt) - , m_buffer_capacity(buffer_cap) - , m_buffer_delete_capacity(max_delete_prop*buffer_cap) + , m_buffer(new Buffer(buffer_lwm, buffer_hwm)) { - auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); - auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop); - auto epoch = new _Epoch(0, vers, buf); + auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); + auto epoch = new _Epoch(0, vers, m_buffer); - m_buffers.insert(buf); m_versions.insert(vers); m_epochs.insert({0, epoch}); } @@ -79,9 +70,7 @@ public: delete e.second; } - for (auto e : m_buffers) { - delete e; - } + delete m_buffer; for (auto e : m_versions) { delete e; @@ -95,10 +84,15 @@ public: int erase(const R &rec) { // FIXME: delete tagging will require a lot of extra work to get // operating "correctly" in a concurrent environment. + + /* + * Get a view on the buffer *first*. This will ensure a stronger + * ordering than simply accessing the buffer directly, but is + * not *strictly* necessary. + */ + auto view = m_buffer->get_buffer_view(); if constexpr (D == DeletePolicy::TAGGING) { static_assert(std::same_as<SCHED, SerialScheduler>, "Tagging is only supported in single-threaded operation"); - BufView buffers = get_active_epoch()->get_buffer_view(); - if (get_active_epoch()->get_structure()->tagged_delete(rec)) { return 1; } @@ -108,7 +102,7 @@ public: * probably has the lowest probability of having the record, * so we'll check it last. */ - return buffers.delete_record(rec); + return view.delete_record(rec); } /* @@ -123,7 +117,7 @@ public: size_t get_record_count() { auto epoch = get_active_epoch_protected(); - auto t = epoch->get_buffer_view().get_record_count() + epoch->get_structure()->get_record_count(); + auto t = epoch->get_buffer().get_record_count() + epoch->get_structure()->get_record_count(); epoch->end_job(); return t; @@ -131,7 +125,7 @@ public: size_t get_tombstone_count() { auto epoch = get_active_epoch_protected(); - auto t = epoch->get_buffer_view().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); + auto t = epoch->get_buffer().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); epoch->end_job(); return t; @@ -147,7 +141,7 @@ public: size_t get_memory_usage() { auto epoch = get_active_epoch_protected(); - auto t= epoch->get_buffer_view().get_memory_usage() + epoch->get_structure()->get_memory_usage(); + auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage(); epoch->end_job(); return t; @@ -155,14 +149,14 @@ public: size_t get_aux_memory_usage() { auto epoch = get_active_epoch_protected(); - auto t = epoch->get_buffer_view().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); + auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); epoch->end_job(); return t; } size_t get_buffer_capacity() { - return m_buffer_capacity; + return m_buffer->get_capacity(); } Shard *create_static_structure(bool await_reconstruction_completion=false) { @@ -171,11 +165,20 @@ public: } auto epoch = get_active_epoch_protected(); - auto bv = epoch->get_buffer_view(); - auto vers = epoch->get_structure(); std::vector<Shard *> shards; + /* + * construct a shard from the buffer view. We'll hold the view + * for as short a time as possible: once the records are exfiltrated + * from the buffer, there's no reason to retain a hold on the view's + * head pointer any longer + */ + { + auto bv = epoch->get_buffer(); + shards.emplace_back(new S(std::move(bv))); + } + if (vers->get_levels().size() > 0) { for (int i=vers->get_levels().size() - 1; i>= 0; i--) { if (vers->get_levels()[i]) { @@ -184,12 +187,6 @@ public: } } - // FIXME: With an interface adjustment, this could be done in - // one call, rather than a loop. - for (ssize_t i=bv.size() - 1; i>=0; i--) { - shards.emplace_back(new S(bv.get_buffers()[i])); - } - Shard *shards_array[shards.size()]; size_t j = 0; @@ -237,10 +234,13 @@ public: private: SCHED m_sched; + Buffer *m_buffer; + std::mutex m_struct_lock; - std::set<Buffer*> m_buffers; std::set<Structure *> m_versions; + alignas(64) std::atomic<bool> m_reconstruction_scheduled; + std::atomic<size_t> m_current_epoch; std::atomic<size_t> m_newest_epoch; std::unordered_map<size_t, _Epoch *> m_epochs; @@ -253,8 +253,6 @@ private: size_t m_scale_factor; double m_max_delete_prop; - size_t m_buffer_capacity; - size_t m_buffer_delete_capacity; void enforce_delete_invariant(_Epoch *epoch) { auto structure = epoch->get_structure(); @@ -321,6 +319,7 @@ private: */ enforce_delete_invariant(new_epoch); + #if 0 /* * Update the new Epoch to contain the buffers from the old one * that it doesn't currently have if using a multi-threaded @@ -339,6 +338,7 @@ private: new_epoch->add_buffer(old_epoch->get_buffers()[i]); } } + #endif m_current_epoch.fetch_add(1); old_epoch->set_inactive(); @@ -373,57 +373,6 @@ private: return new_epoch; } - /* - * Add a new empty buffer. This is intended to be used - * when a reconstruction is triggered, to allow for inserts to be sustained in the new - * buffer while a new epoch is being created in the background. Returns a - * pointer to the newly created buffer. - */ - Buffer *add_empty_buffer() { - /* - * if there's a current Epoch transition ongoing, a buffer installed - * into an older Epoch, but not the new one, may be lost. So fail to - * insert a buffer. - */ - if (!m_epoch_transition_lk.try_lock()) { - return nullptr; - } - - /* - * verify that the currently active buffer is still full, if - * not, there is no reason to add a new one. This code is - * protected by the epoch transition lock, so need need to - * take a protected reference to the epoch. - */ - auto active_epoch = get_active_epoch(); - if (!active_epoch->get_active_buffer()->is_full()) { - m_epoch_transition_lk.unlock(); - return nullptr; - } - - /* - * create a new buffer and install it in the active epoch. - */ - auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); - - std::unique_lock<std::mutex> m_struct_lock; - auto new_buffer = active_epoch->add_buffer(temp_buffer); - - /* - * if epoch->add_buffer doesn't add the new buffer, this insert - * won't update the buffer set (duplicate insert) - */ - m_buffers.insert(new_buffer); - m_struct_lock.release(); - - if (new_buffer != temp_buffer) { - delete temp_buffer; - } - m_epoch_transition_lk.unlock(); - - return new_buffer; - } - void retire_epoch(_Epoch *epoch) { /* * Epochs with currently active jobs cannot @@ -452,21 +401,14 @@ private: delete epoch; m_epoch_retire_lk.unlock(); + /* NOTE: the BufferView mechanism handles freeing unused buffer space */ + /* * Following the epoch's destruction, any buffers * or structures with no remaining references can * be safely freed. */ std::unique_lock<std::mutex> lock(m_struct_lock); - for (auto itr = m_buffers.begin(); itr != m_buffers.end();) { - if ((*itr)->get_reference_count() == 0) { - auto tmp = *itr; - itr = m_buffers.erase(itr); - delete tmp; - } else { - itr++; - } - } for (auto itr = m_versions.begin(); itr != m_versions.end();) { if ((*itr)->get_reference_count() == 0) { @@ -484,21 +426,31 @@ private: Structure *vers = args->epoch->get_structure(); - // FIXME: with an improved shard interface, multiple full buffers + // FIXME: with an improved shard interface, multiple full buffer_viewers // could be flushed at once here. - Buffer *buff = (Buffer *) args->epoch->get_buffers()[0]; + auto buffer_view = args->epoch->get_buffer(); + size_t new_head = buffer_view.get_tail(); for (ssize_t i=0; i<args->merges.size(); i++) { vers->reconstruction(args->merges[i].second, args->merges[i].first); } /* - * if performing a compaction, don't push the buffer down, - * as there is no guarantee that any necessary reconstructions + * if performing a compaction, don't flush the buffer, as + * there is no guarantee that any necessary reconstructions * will free sufficient space in L0 to support a flush */ if (!args->compaction) { - vers->flush_buffer(buff); + vers->flush_buffer(std::move(buffer_view)); + + // FIXME: this may currently fail because there isn't any + // query preemption yet. At this point, we'd need to either + // 1) wait for all queries on the old_head to finish + // 2) kill all queries on the old_head + // 3) somehow migrate all queries on the old_head to the new + // version + auto res = args->epoch->advance_buffer_head(new_head); + assert(res); } args->epoch->end_job(); @@ -519,27 +471,33 @@ private: static void async_query(void *arguments) { QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments; - auto buffers = args->epoch->get_buffer_view(); + auto buffer = args->epoch->get_buffer(); auto vers = args->epoch->get_structure(); void *parms = args->query_parms; /* Get the buffer query states */ - std::vector<void *> buffer_states = buffers.get_query_states(parms); + void *buffer_state = Q::get_buffer_query_state(std::move(buffer), parms); /* Get the shard query states */ std::vector<std::pair<ShardID, Shard*>> shards; std::vector<void *> states = vers->get_query_states(shards, parms); - Q::process_query_states(parms, states, buffer_states); + Q::process_query_states(parms, states, buffer_state); - std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + buffer_states.size()); + std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); for (size_t i=0; i<query_results.size(); i++) { - std::vector<Wrapped<R>> local_results = (i < buffer_states.size()) - ? Q::buffer_query(buffers.get_buffers()[i], buffer_states[i], parms) - : Q::query(shards[i - buffer_states.size()].second, - states[i - buffer_states.size()], parms); - ShardID shid = (i < buffer_states.size()) ? INVALID_SHID : shards[i - buffer_states.size()].first; - query_results[i] = std::move(filter_deletes(local_results, shid, buffers, vers)); + std::vector<Wrapped<R>> local_results; + ShardID shid; + + if (i == 0) { /* process the buffer first */ + local_results = Q::buffer_query(buffer_state, parms); + shid = INVALID_SHID; + } else { + local_results = Q::query(shards[i - 1].second, states[i - 1], parms); + shid = shards[i - 1].first; + } + + query_results[i] = std::move(filter_deletes(local_results, shid, vers)); if constexpr (Q::EARLY_ABORT) { if (query_results[i].size() > 0) break; @@ -551,10 +509,7 @@ private: args->epoch->end_job(); - for (size_t i=0; i<buffer_states.size(); i++) { - Q::delete_buffer_query_state(buffer_states[i]); - } - + Q::delete_buffer_query_state(buffer_state); for (size_t i=0; i<states.size(); i++) { Q::delete_query_state(states[i]); } @@ -575,7 +530,7 @@ private: // ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>(); args->epoch = epoch; - args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffers()[0]->get_record_count()); + args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffer().get_record_count()); args->extension = this; args->compaction = false; /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ @@ -597,49 +552,16 @@ private: } int internal_append(const R &rec, bool ts) { - Buffer *buffer = nullptr; - int res = 0; - do { - auto epoch = get_active_epoch_protected(); - buffer = epoch->get_active_buffer(); - assert(buffer); - - /* - * If the buffer is full and there is no ongoing reconstruction, - * schedule a reconstruction and add a new empty buffer. If there - * is an ongoing reconstruction, then add a new empty buffer - * to the current epoch. - */ - if (buffer->is_full()) { - if constexpr (std::same_as<SCHED, SerialScheduler>) { - /* single threaded: run reconstruction and then empty buffer */ - epoch->end_job(); - schedule_reconstruction(); - buffer->truncate(); - continue; - } else if (epoch->prepare_reconstruction()) { - /* - * add an empty buffer to allow insert proceed and - * schedule a reconstruction on a background thread - */ - buffer = add_empty_buffer(); - schedule_reconstruction(); - } else { - /* background reconstruction is ongoing, so just add empty buffer */ - buffer = add_empty_buffer(); - } - } - - res = (buffer) ? buffer->append(rec, ts) : 0; - epoch->end_job(); - } while(!res); + if (!m_reconstruction_scheduled.load() && m_buffer->is_at_low_watermark()) { + m_reconstruction_scheduled.store(true); + schedule_reconstruction(); + } - /* internal append should always succeed, eventually */ - return 1; + /* this will fail if the HWM is reached and return 0 */ + return m_buffer->append(rec, ts); } - static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, - BufView &buffers, Structure *vers) { + static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -672,9 +594,11 @@ private: continue; } - if (buffers.check_tombstone(rec.rec)) { - continue; - } + // FIXME: need to figure out how best to re-enable the buffer tombstone + // check in the correct manner. + //if (buffview.check_tombstone(rec.rec)) { + //continue; + //} if (shid != INVALID_SHID) { for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { |