diff options
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 1324 |
1 files changed, 670 insertions, 654 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index e2e2784..16cbb0e 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -1,8 +1,8 @@ /* * include/framework/DynamicExtension.h * - * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> - * Dong Xie <dongx@psu.edu> + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> * * Distributed under the Modified BSD License. * @@ -14,766 +14,782 @@ #include <vector> #include "framework/interface/Scheduler.h" -#include "framework/scheduling/FIFOScheduler.h" #include "framework/scheduling/SerialScheduler.h" -#include "framework/structure/MutableBuffer.h" -#include "framework/interface/Record.h" #include "framework/structure/ExtensionStructure.h" +#include "framework/structure/MutableBuffer.h" -#include "framework/util/Configuration.h" #include "framework/scheduling/Epoch.h" +#include "framework/util/Configuration.h" namespace de { -template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L=LayoutPolicy::TEIRING, - DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=FIFOScheduler> +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType, + LayoutPolicy L = LayoutPolicy::TEIRING, + DeletePolicy D = DeletePolicy::TAGGING, + SchedulerInterface SchedType = SerialScheduler> class DynamicExtension { - typedef S Shard; - typedef MutableBuffer<R> Buffer; - typedef ExtensionStructure<R, S, Q, L> Structure; - typedef Epoch<R, S, Q, L> _Epoch; - typedef BufferView<R> BufView; - - static constexpr size_t QUERY = 1; - static constexpr size_t RECONSTRUCTION = 2; - - struct epoch_ptr { - _Epoch *epoch; - size_t refcnt; - }; - + /* for unit testing purposes */ public: - DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0, - size_t thread_cnt=16) - : m_scale_factor(scale_factor) - , m_max_delete_prop(1) - , m_sched(memory_budget, thread_cnt) - , m_buffer(new Buffer(buffer_lwm, buffer_hwm)) - , m_core_cnt(thread_cnt) - , m_next_core(0) - , m_epoch_cnt(0) - { - if constexpr (L == LayoutPolicy::BSM) { - assert(scale_factor == 2); - } - - auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); - m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); - m_previous_epoch.store({nullptr, 0}); - m_next_epoch.store({nullptr, 0}); - } - - ~DynamicExtension() { - - /* let any in-flight epoch transition finish */ - await_next_epoch(); - - /* shutdown the scheduler */ - m_sched.shutdown(); - - /* delete all held resources */ - delete m_next_epoch.load().epoch; - delete m_current_epoch.load().epoch; - delete m_previous_epoch.load().epoch; - - delete m_buffer; - } - - /* - * Insert the record `rec` into the index. If the buffer is full and - * the framework is blocking on an epoch transition, this call may fail - * and return 0. In this case, retry the call again later. If - * successful, 1 will be returned. The record will be immediately - * visible in the buffer upon the successful return of this function. - */ - int insert(const R &rec) { - return internal_append(rec, false); - } - - /* - * Erase the record `rec` from the index. It is assumed that `rec` - * currently exists--no special checks are made for correctness here. - * The behavior if this function will differ depending on if tombstone - * or tagged deletes are used. - * - * Tombstone deletes - inserts a tombstone record for `rec`. This *may* - * return 0 and fail if the buffer is full and the framework is - * blocking on an epoch transition. In this case, repeat the call - * later. 1 will be returned when the tombstone is successfully - * inserted. - * - * Tagging deletes - Does a point lookup for the record across the - * entire structure, and sets its delete bit when found. Returns 1 if - * the record is found and marked, and 0 if it was not (i.e., if it - * isn't present in the index). - */ - int erase(const R &rec) { - // FIXME: delete tagging will require a lot of extra work to get - // operating "correctly" in a concurrent environment. - - /* - * Get a view on the buffer *first*. This will ensure a stronger - * ordering than simply accessing the buffer directly, but is - * not *strictly* necessary. - */ - if constexpr (D == DeletePolicy::TAGGING) { - static_assert(std::same_as<SCHED, SerialScheduler>, "Tagging is only supported in single-threaded operation"); - - auto view = m_buffer->get_buffer_view(); - - auto epoch = get_active_epoch(); - if (epoch->get_structure()->tagged_delete(rec)) { - end_job(epoch); - return 1; - } - - end_job(epoch); - - /* - * the buffer will take the longest amount of time, and - * probably has the lowest probability of having the record, - * so we'll check it last. - */ - return view.delete_record(rec); - } + LayoutPolicy Layout = L; - /* - * If tagging isn't used, then delete using a tombstone - */ - return internal_append(rec, true); - } - - /* - * Execute the query with parameters `parms` and return a future. This - * future can be used to access a vector containing the results of the - * query. - * - * The behavior of this function is undefined if `parms` is not a - * pointer to a valid query parameter object for the query type used as - * a template parameter to construct the framework. - */ - std::future<std::vector<R>> query(void *parms) { - return schedule_query(parms); - } - - /* - * Returns the number of records (included tagged records and - * tombstones) currently within the framework. - */ - size_t get_record_count() { - auto epoch = get_active_epoch(); - auto t = epoch->get_buffer().get_record_count() + epoch->get_structure()->get_record_count(); - end_job(epoch); +private: + /* convenience typedefs for commonly used types within the class */ + typedef typename ShardType::RECORD RecordType; + typedef MutableBuffer<RecordType> Buffer; + typedef ExtensionStructure<ShardType, QueryType, L> Structure; + typedef Epoch<ShardType, QueryType, L> _Epoch; + typedef BufferView<RecordType> BufView; + + typedef typename QueryType::Parameters Parameters; + typedef typename QueryType::LocalQuery LocalQuery; + typedef typename QueryType::LocalQueryBuffer BufferQuery; + typedef typename QueryType::LocalResultType LocalResult; + typedef typename QueryType::ResultType QueryResult; + + + static constexpr size_t QUERY = 1; + static constexpr size_t RECONSTRUCTION = 2; + + struct epoch_ptr { + _Epoch *epoch; + size_t refcnt; + }; - return t; +public: + /** + * Create a new Dynamized version of a data structure, supporting + * inserts and, possibly, deletes. The following parameters are used + * for configuration of the structure, + * @param buffer_low_watermark The number of records that can be + * inserted before a buffer flush is initiated + * + * @param buffer_high_watermark The maximum buffer capacity, inserts + * will begin to fail once this number is reached, until the + * buffer flush has completed. Has no effect in single-threaded + * operation + * + * @param scale_factor The rate at which the capacity of levels + * grows; should be at least 2 for reasonable performance + * + * @param memory_budget Unused at this time + * + * @param thread_cnt The maximum number of threads available to the + * framework's scheduler for use in answering queries and + * performing compactions and flushes, etc. + */ + DynamicExtension(size_t buffer_low_watermark, size_t buffer_high_watermark, + size_t scale_factor, size_t memory_budget = 0, + size_t thread_cnt = 16) + : m_scale_factor(scale_factor), m_max_delete_prop(1), + m_sched(memory_budget, thread_cnt), + m_buffer(new Buffer(buffer_low_watermark, buffer_high_watermark)), + m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0) { + if constexpr (L == LayoutPolicy::BSM) { + assert(scale_factor == 2); } - /* - * Returns the number of tombstone records currently within the - * framework. This function can be called when tagged deletes are used, - * but will always return 0 in that case. - */ - size_t get_tombstone_count() { - auto epoch = get_active_epoch(); - auto t = epoch->get_buffer().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); - end_job(epoch); - - return t; - } + auto vers = + new Structure(buffer_high_watermark, m_scale_factor, m_max_delete_prop); + m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); + m_previous_epoch.store({nullptr, 0}); + m_next_epoch.store({nullptr, 0}); + } + + /** + * Destructor for DynamicExtension. Will block until the completion of + * any outstanding epoch transition, shut down the scheduler, and free + * all currently allocated shards, buffers, etc., by calling their + * destructors. + */ + ~DynamicExtension() { + + /* let any in-flight epoch transition finish */ + await_next_epoch(); + + /* shutdown the scheduler */ + m_sched.shutdown(); + + /* delete all held resources */ + delete m_next_epoch.load().epoch; + delete m_current_epoch.load().epoch; + delete m_previous_epoch.load().epoch; + + delete m_buffer; + } + + /** + * Inserts a record into the index. Returns 1 if the insert succeeds, + * and 0 if it fails. Inserts may fail if the DynamicExtension's buffer + * has reached the high water mark; in this case, the insert should be + * retried when the buffer has flushed. The record will be immediately + * visible inside the index upon the return of this function. + * + * @param rec The record to be inserted + * + * @return 1 on success, 0 on failure (in which case the insert should + * be retried) + */ + int insert(const RecordType &rec) { return internal_append(rec, false); } + + /** + * Erases a record from the index, according to the DeletePolicy + * template parameter. Returns 1 on success and 0 on failure. The + * equality comparison operator of RecordType is used to identify + * the record to be deleted. + * + * Deletes behave differently, depending on the DeletionPolicy. For + * Tombstone deletes, a tombstone record will be inserted into the + * index. The presence of the deleted record is not checked first, so + * deleting a record that does not exist will result in an unnecessary + * tombstone record being written. + * + * Deletes using Tagging will perform a point lookup for the record to + * be removed, and mark it as deleted in its header. + * + * @param rec The record to be deleted. The argument to this function + * should compare equal to the record to be deleted. + * + * @return 1 on success, and 0 on failure. For tombstone deletes, a + * failure will occur if the insert fails due to the buffer + * being full, and can be retried. For tagging deletes, a + * failure means that hte record to be deleted could not be + * found in the index, and should *not* be retried. + */ + int erase(const RecordType &rec) { + // FIXME: delete tagging will require a lot of extra work to get + // operating "correctly" in a concurrent environment. /* - * Get the number of levels within the framework. This count will - * include any empty levels, but will not include the buffer. Note that - * this is *not* the same as the number of shards when tiering is used, - * as each level can contain multiple shards in that case. + * Get a view on the buffer *first*. This will ensure a stronger + * ordering than simply accessing the buffer directly, but is + * not *strictly* necessary. */ - size_t get_height() { - auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->get_height(); - end_job(epoch); + if constexpr (D == DeletePolicy::TAGGING) { + static_assert(std::same_as<SchedType, SerialScheduler>, + "Tagging is only supported in single-threaded operation"); - return t; - } + auto view = m_buffer->get_buffer_view(); - /* - * Get the number of bytes of memory allocated across the framework for - * storing records and associated index information (i.e., internal - * ISAM tree nodes). This includes memory that is allocated but - * currently unused in the buffer, or in shards themselves - * (overallocation due to delete cancellation, etc.). - */ - size_t get_memory_usage() { - auto epoch = get_active_epoch(); - auto t = m_buffer->get_memory_usage() + epoch->get_structure()->get_memory_usage(); + auto epoch = get_active_epoch(); + if (epoch->get_structure()->tagged_delete(rec)) { end_job(epoch); + return 1; + } - return t; - } - - /* - * Get the number of bytes of memory allocated across the framework for - * auxiliary structures. This can include bloom filters, aux - * hashtables, etc. - */ - size_t get_aux_memory_usage() { - auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->get_aux_memory_usage(); - end_job(epoch); + end_job(epoch); - return t; + /* + * the buffer will take the longest amount of time, and + * probably has the lowest probability of having the record, + * so we'll check it last. + */ + return view.delete_record(rec); } /* - * Returns the maximum physical capacity of the buffer, measured in - * records. + * If tagging isn't used, then delete using a tombstone */ - size_t get_buffer_capacity() { - return m_buffer->get_capacity(); + return internal_append(rec, true); + } + + /** + * Schedule the execution of a query with specified parameters and + * returns a future that can be used to access the results. The query + * is executed asynchronously. + * @param parms An rvalue reference to the query parameters. + * + * @return A future, from which the query results can be retrieved upon + * query completion + */ + std::future<std::vector<QueryResult>> + query(Parameters &&parms) { + return schedule_query(std::move(parms)); + } + + /** + * Determine the number of records (including tagged records and + * tombstones) currently within the framework. This number is used for + * determining when and how reconstructions occur. + * + * @return The number of records within the index + */ + size_t get_record_count() { + auto epoch = get_active_epoch(); + auto t = epoch->get_buffer().get_record_count() + + epoch->get_structure()->get_record_count(); + end_job(epoch); + + return t; + } + + /** + * Returns the number of tombstone records currently within the + * index. This function can be called when tagged deletes are used, + * but will always return 0 in that case. + * + * @return The number of tombstone records within the index + */ + size_t get_tombstone_count() { + auto epoch = get_active_epoch(); + auto t = epoch->get_buffer().get_tombstone_count() + + epoch->get_structure()->get_tombstone_count(); + end_job(epoch); + + return t; + } + + /** + * Get the number of levels within the framework. This count will + * include any empty levels, but will not include the buffer. Note that + * this is *not* the same as the number of shards when tiering is used, + * as each level can contain multiple shards in that case. + * + * @return The number of levels within the index + */ + size_t get_height() { + auto epoch = get_active_epoch(); + auto t = epoch->get_structure()->get_height(); + end_job(epoch); + + return t; + } + + /** + * Get the number of bytes of memory allocated across the framework for + * storing records and associated index information (i.e., internal + * ISAM tree nodes). This includes memory that is allocated but + * currently unused in the buffer, or in shards themselves + * (overallocation due to delete cancellation, etc.). + * + * @return The number of bytes of memory used for shards (as reported by + * ShardType::get_memory_usage) and the buffer by the index. + */ + size_t get_memory_usage() { + auto epoch = get_active_epoch(); + auto t = m_buffer->get_memory_usage() + + epoch->get_structure()->get_memory_usage(); + end_job(epoch); + + return t; + } + + /** + * Get the number of bytes of memory allocated across the framework for + * auxiliary structures. This can include bloom filters, aux + * hashtables, etc. + * + * @return The number of bytes of memory used for auxilliary structures + * (as reported by ShardType::get_aux_memory_usage) by the index. + */ + size_t get_aux_memory_usage() { + auto epoch = get_active_epoch(); + auto t = epoch->get_structure()->get_aux_memory_usage(); + end_job(epoch); + + return t; + } + + /** + * Create a new single Shard object containing all of the records + * within the framework (buffer and shards). + * + * @param await_reconstruction_completion Specifies whether the currently + * active state of the index should be used to create the shard + * (false), or if shard construction should wait for any active + * reconstructions to finish first (true). Default value of false. + * + * @return A new shard object, containing a copy of all records within + * the index. Ownership of this object is transfered to the + * caller. + */ + ShardType * + create_static_structure(bool await_reconstruction_completion = false) { + if (await_reconstruction_completion) { + await_next_epoch(); } - - /* - * Create a new single Shard object containing all of the records - * within the framework (buffer and shards). The optional parameter can - * be used to specify whether the Shard should be constructed with the - * currently active state of the framework (false), or if shard - * construction should wait until any ongoing reconstructions have - * finished and use that new version (true). - */ - Shard *create_static_structure(bool await_reconstruction_completion=false) { - if (await_reconstruction_completion) { - await_next_epoch(); - } - auto epoch = get_active_epoch(); - auto vers = epoch->get_structure(); - std::vector<Shard *> shards; + auto epoch = get_active_epoch(); + auto vers = epoch->get_structure(); + std::vector<ShardType *> shards; - - if (vers->get_levels().size() > 0) { - for (int i=vers->get_levels().size() - 1; i>= 0; i--) { - if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) { - shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); - } - } - } - - /* - * construct a shard from the buffer view. We'll hold the view - * for as short a time as possible: once the records are exfiltrated - * from the buffer, there's no reason to retain a hold on the view's - * head pointer any longer - */ - { - auto bv = epoch->get_buffer(); - if (bv.get_record_count() > 0) { - shards.emplace_back(new S(std::move(bv))); - } - } - - Shard *flattened = new S(shards); - - for (auto shard : shards) { - delete shard; + if (vers->get_levels().size() > 0) { + for (int i = vers->get_levels().size() - 1; i >= 0; i--) { + if (vers->get_levels()[i] && + vers->get_levels()[i]->get_record_count() > 0) { + shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); } - - end_job(epoch); - return flattened; + } } /* - * If the current epoch is *not* the newest one, then wait for - * the newest one to become available. Otherwise, returns immediately. + * construct a shard from the buffer view. We'll hold the view + * for as short a time as possible: once the records are exfiltrated + * from the buffer, there's no reason to retain a hold on the view's + * head pointer any longer */ - void await_next_epoch() { - while (m_next_epoch.load().epoch != nullptr) { - std::unique_lock<std::mutex> lk(m_epoch_cv_lk); - m_epoch_cv.wait(lk); - } + { + auto bv = epoch->get_buffer(); + if (bv.get_record_count() > 0) { + shards.emplace_back(new ShardType(std::move(bv))); + } } - /* - * Mostly exposed for unit-testing purposes. Verifies that the current - * active version of the ExtensionStructure doesn't violate the maximum - * tombstone proportion invariant. - */ - bool validate_tombstone_proportion() { - auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->validate_tombstone_proportion(); - end_job(epoch); - return t; - } + ShardType *flattened = new ShardType(shards); + for (auto shard : shards) { + delete shard; + } - void print_scheduler_statistics() { - m_sched.print_statistics(); + end_job(epoch); + return flattened; + } + + /* + * If the current epoch is *not* the newest one, then wait for + * the newest one to become available. Otherwise, returns immediately. + */ + void await_next_epoch() { + while (m_next_epoch.load().epoch != nullptr) { + std::unique_lock<std::mutex> lk(m_epoch_cv_lk); + m_epoch_cv.wait(lk); } + } + + /** + * Verify that the currently active version of the index does not + * violate tombstone proportion invariants. Exposed for unit-testing + * purposes. + * + * @return Returns true if the tombstone proportion invariant is + * satisfied, and false if it is not. + */ + bool validate_tombstone_proportion() { + auto epoch = get_active_epoch(); + auto t = epoch->get_structure()->validate_tombstone_proportion(); + end_job(epoch); + return t; + } + + /** + * Calls SchedType::print_statistics, which should write a report of + * scheduler performance statistics to stdout. + */ + void print_scheduler_statistics() const { m_sched.print_statistics(); } private: - SCHED m_sched; - - Buffer *m_buffer; + size_t m_scale_factor; + double m_max_delete_prop; - //std::mutex m_struct_lock; - //std::set<Structure *> m_versions; + SchedType m_sched; + Buffer *m_buffer; - alignas(64) std::atomic<bool> m_reconstruction_scheduled; + size_t m_core_cnt; + std::atomic<int> m_next_core; + std::atomic<size_t> m_epoch_cnt; + + alignas(64) std::atomic<bool> m_reconstruction_scheduled; - std::atomic<epoch_ptr> m_next_epoch; - std::atomic<epoch_ptr> m_current_epoch; - std::atomic<epoch_ptr> m_previous_epoch; + std::atomic<epoch_ptr> m_next_epoch; + std::atomic<epoch_ptr> m_current_epoch; + std::atomic<epoch_ptr> m_previous_epoch; - std::condition_variable m_epoch_cv; - std::mutex m_epoch_cv_lk; + std::condition_variable m_epoch_cv; + std::mutex m_epoch_cv_lk; - std::atomic<size_t> m_epoch_cnt; - size_t m_scale_factor; - double m_max_delete_prop; - std::atomic<int> m_next_core; - size_t m_core_cnt; - void enforce_delete_invariant(_Epoch *epoch) { - auto structure = epoch->get_structure(); - auto compactions = structure->get_compaction_tasks(); + void enforce_delete_invariant(_Epoch *epoch) { + auto structure = epoch->get_structure(); + auto compactions = structure->get_compaction_tasks(); - while (compactions.size() > 0) { + while (compactions.size() > 0) { - /* schedule a compaction */ - ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>(); - args->epoch = epoch; - args->merges = compactions; - args->extension = this; - args->compaction = true; - /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ + /* schedule a compaction */ + ReconstructionArgs<ShardType, QueryType, L> *args = + new ReconstructionArgs<ShardType, QueryType, L>(); + args->epoch = epoch; + args->merges = compactions; + args->extension = this; + args->compaction = true; + /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed + * here */ - auto wait = args->result.get_future(); + auto wait = args->result.get_future(); - m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); + m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); - /* wait for compaction completion */ - wait.get(); - - /* get a new batch of compactions to perform, if needed */ - compactions = structure->get_compaction_tasks(); - } - } + /* wait for compaction completion */ + wait.get(); - _Epoch *get_active_epoch() { - epoch_ptr old, new_ptr; - - do { - /* - * during an epoch transition, a nullptr will installed in the - * current_epoch. At this moment, the "new" current epoch will - * soon be installed, but the "current" current epoch has been - * moved back to m_previous_epoch. - */ - if (m_current_epoch.load().epoch == nullptr) { - old = m_previous_epoch; - new_ptr = {old.epoch, old.refcnt+1}; - if (old.epoch != nullptr && m_previous_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } else { - old = m_current_epoch; - new_ptr = {old.epoch, old.refcnt+1}; - if (old.epoch != nullptr && m_current_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } - } while (true); - - assert(new_ptr.refcnt > 0); - - return new_ptr.epoch; + /* get a new batch of compactions to perform, if needed */ + compactions = structure->get_compaction_tasks(); } + } + + _Epoch *get_active_epoch() { + epoch_ptr old, new_ptr; + + do { + /* + * during an epoch transition, a nullptr will installed in the + * current_epoch. At this moment, the "new" current epoch will + * soon be installed, but the "current" current epoch has been + * moved back to m_previous_epoch. + */ + if (m_current_epoch.load().epoch == nullptr) { + old = m_previous_epoch; + new_ptr = {old.epoch, old.refcnt + 1}; + if (old.epoch != nullptr && + m_previous_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } else { + old = m_current_epoch; + new_ptr = {old.epoch, old.refcnt + 1}; + if (old.epoch != nullptr && + m_current_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } + } while (true); - void advance_epoch(size_t buffer_head) { + assert(new_ptr.refcnt > 0); - retire_epoch(m_previous_epoch.load().epoch); + return new_ptr.epoch; + } - epoch_ptr tmp = {nullptr, 0}; - epoch_ptr cur; - do { - cur = m_current_epoch; - } while(!m_current_epoch.compare_exchange_strong(cur, tmp)); + void advance_epoch(size_t buffer_head) { - m_previous_epoch.store(cur); + retire_epoch(m_previous_epoch.load().epoch); - // FIXME: this may currently block because there isn't any - // query preemption yet. At this point, we'd need to either - // 1) wait for all queries on the old_head to finish - // 2) kill all queries on the old_head - // 3) somehow migrate all queries on the old_head to the new - // version - while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) { - _mm_pause(); - } + epoch_ptr tmp = {nullptr, 0}; + epoch_ptr cur; + do { + cur = m_current_epoch; + } while (!m_current_epoch.compare_exchange_strong(cur, tmp)); + m_previous_epoch.store(cur); - m_current_epoch.store(m_next_epoch); - m_next_epoch.store({nullptr, 0}); + // FIXME: this may currently block because there isn't any + // query preemption yet. At this point, we'd need to either + // 1) wait for all queries on the old_head to finish + // 2) kill all queries on the old_head + // 3) somehow migrate all queries on the old_head to the new + // version + while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) { + _mm_pause(); + } + m_current_epoch.store(m_next_epoch); + m_next_epoch.store({nullptr, 0}); - /* notify any blocking threads that the new epoch is available */ - m_epoch_cv_lk.lock(); - m_epoch_cv.notify_all(); - m_epoch_cv_lk.unlock(); - } + /* notify any blocking threads that the new epoch is available */ + m_epoch_cv_lk.lock(); + m_epoch_cv.notify_all(); + m_epoch_cv_lk.unlock(); + } + /* + * Creates a new epoch by copying the currently active one. The new epoch's + * structure will be a shallow copy of the old one's. + */ + _Epoch *create_new_epoch() { /* - * Creates a new epoch by copying the currently active one. The new epoch's - * structure will be a shallow copy of the old one's. + * This epoch access is _not_ protected under the assumption that + * only one reconstruction will be able to trigger at a time. If that + * condition is violated, it is possible that this code will clone a retired + * epoch. */ - _Epoch *create_new_epoch() { - /* - * This epoch access is _not_ protected under the assumption that - * only one reconstruction will be able to trigger at a time. If that condition - * is violated, it is possible that this code will clone a retired - * epoch. - */ - assert(m_next_epoch.load().epoch == nullptr); - auto current_epoch = get_active_epoch(); + assert(m_next_epoch.load().epoch == nullptr); + auto current_epoch = get_active_epoch(); - m_epoch_cnt.fetch_add(1); - m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0}); + m_epoch_cnt.fetch_add(1); + m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0}); - end_job(current_epoch); + end_job(current_epoch); - return m_next_epoch.load().epoch; - } + return m_next_epoch.load().epoch; + } - void retire_epoch(_Epoch *epoch) { - /* - * Epochs with currently active jobs cannot - * be retired. By the time retire_epoch is called, - * it is assumed that a new epoch is active, meaning - * that the epoch to be retired should no longer - * accumulate new active jobs. Eventually, this - * number will hit zero and the function will - * proceed. - */ - - if (epoch == nullptr) { - return; - } + void retire_epoch(_Epoch *epoch) { + /* + * Epochs with currently active jobs cannot + * be retired. By the time retire_epoch is called, + * it is assumed that a new epoch is active, meaning + * that the epoch to be retired should no longer + * accumulate new active jobs. Eventually, this + * number will hit zero and the function will + * proceed. + */ - epoch_ptr old, new_ptr; - new_ptr = {nullptr, 0}; - do { - old = m_previous_epoch.load(); - - /* - * If running in single threaded mode, the failure to retire - * an Epoch will result in the thread of execution blocking - * indefinitely. - */ - if constexpr (std::same_as<SCHED, SerialScheduler>) { - if (old.epoch == epoch) assert(old.refcnt == 0); - } - - if (old.epoch == epoch && old.refcnt == 0 && - m_previous_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - usleep(1); - - } while(true); - - delete epoch; + if (epoch == nullptr) { + return; } - static void reconstruction(void *arguments) { - auto args = (ReconstructionArgs<R, S, Q, L> *) arguments; + epoch_ptr old, new_ptr; + new_ptr = {nullptr, 0}; + do { + old = m_previous_epoch.load(); + + /* + * If running in single threaded mode, the failure to retire + * an Epoch will result in the thread of execution blocking + * indefinitely. + */ + if constexpr (std::same_as<SchedType, SerialScheduler>) { + if (old.epoch == epoch) + assert(old.refcnt == 0); + } + + if (old.epoch == epoch && old.refcnt == 0 && + m_previous_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + usleep(1); + + } while (true); + + delete epoch; + } + + static void reconstruction(void *arguments) { + auto args = (ReconstructionArgs<ShardType, QueryType, L> *)arguments; + + ((DynamicExtension *)args->extension)->SetThreadAffinity(); + Structure *vers = args->epoch->get_structure(); + + if constexpr (L == LayoutPolicy::BSM) { + if (args->merges.size() > 0) { + vers->reconstruction(args->merges[0]); + } + } else { + for (ssize_t i = 0; i < args->merges.size(); i++) { + vers->reconstruction(args->merges[i].target, + args->merges[i].sources[0]); + } + } - ((DynamicExtension *) args->extension)->SetThreadAffinity(); - Structure *vers = args->epoch->get_structure(); + /* + * we'll grab the buffer AFTER doing the internal reconstruction, so we + * can flush as many records as possible in one go. The reconstruction + * was done so as to make room for the full buffer anyway, so there's + * no real benefit to doing this first. + */ + auto buffer_view = args->epoch->get_buffer(); + size_t new_head = buffer_view.get_tail(); - if constexpr (L == LayoutPolicy::BSM) { - if (args->merges.size() > 0) { - vers->reconstruction(args->merges[0]); - } - } else { - for (ssize_t i=0; i<args->merges.size(); i++) { - vers->reconstruction(args->merges[i].target, args->merges[i].sources[0]); - } - } + /* + * if performing a compaction, don't flush the buffer, as + * there is no guarantee that any necessary reconstructions + * will free sufficient space in L0 to support a flush + */ + if (!args->compaction) { + vers->flush_buffer(std::move(buffer_view)); + } + args->result.set_value(true); - /* - * we'll grab the buffer AFTER doing the internal reconstruction, so we - * can flush as many records as possible in one go. The reconstruction - * was done so as to make room for the full buffer anyway, so there's - * no real benefit to doing this first. - */ - auto buffer_view = args->epoch->get_buffer(); - size_t new_head = buffer_view.get_tail(); + /* + * Compactions occur on an epoch _before_ it becomes active, + * and as a result the active epoch should _not_ be advanced as + * part of a compaction + */ + if (!args->compaction) { + ((DynamicExtension *)args->extension)->advance_epoch(new_head); + } - /* - * if performing a compaction, don't flush the buffer, as - * there is no guarantee that any necessary reconstructions - * will free sufficient space in L0 to support a flush - */ - if (!args->compaction) { - vers->flush_buffer(std::move(buffer_view)); + ((DynamicExtension *)args->extension) + ->m_reconstruction_scheduled.store(false); + + delete args; + } + + static void async_query(void *arguments) { + auto *args = + (QueryArgs<ShardType, QueryType, DynamicExtension> *) arguments; + + auto epoch = args->extension->get_active_epoch(); + + auto buffer = epoch->get_buffer(); + auto vers = epoch->get_structure(); + auto *parms = &(args->query_parms); + + /* create initial buffer query */ + auto buffer_query = QueryType::local_preproc_buffer(&buffer, parms); + + /* create initial local queries */ + std::vector<std::pair<ShardID, ShardType *>> shards; + std::vector<LocalQuery *> local_queries = + vers->get_local_queries(shards, parms); + + /* process local/buffer queries to create the final version */ + QueryType::distribute_query(parms, local_queries, buffer_query); + + /* execute the local/buffer queries and combine the results into output */ + std::vector<QueryResult> output; + do { + std::vector<std::vector<LocalResult>> + query_results(shards.size() + 1); + for (size_t i = 0; i < query_results.size(); i++) { + std::vector<LocalResult> local_results; + ShardID shid; + + if (i == 0) { /* execute buffer query */ + local_results = QueryType::local_query_buffer(buffer_query); + shid = INVALID_SHID; + } else { /*execute local queries */ + local_results = QueryType::local_query(shards[i - 1].second, + local_queries[i - 1]); + shid = shards[i - 1].first; } - args->result.set_value(true); + /* framework-level, automatic delete filtering */ + query_results[i] = std::move(local_results); - /* - * Compactions occur on an epoch _before_ it becomes active, - * and as a result the active epoch should _not_ be advanced as - * part of a compaction - */ - if (!args->compaction) { - ((DynamicExtension *) args->extension)->advance_epoch(new_head); + /* end query early if EARLY_ABORT is set and a result exists */ + if constexpr (QueryType::EARLY_ABORT) { + if (query_results[i].size() > 0) + break; } + } - ((DynamicExtension *) args->extension)->m_reconstruction_scheduled.store(false); - - delete args; - } - - static void async_query(void *arguments) { - QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments; - - auto epoch = ((DynamicExtension *) args->extension)->get_active_epoch(); - - auto ptr1 = ((DynamicExtension *) args->extension)->m_previous_epoch.load().epoch; - auto ptr2 = ((DynamicExtension *) args->extension)->m_current_epoch.load().epoch; - auto ptr3 = ((DynamicExtension *) args->extension)->m_next_epoch.load().epoch; - - - auto buffer = epoch->get_buffer(); - auto vers = epoch->get_structure(); - void *parms = args->query_parms; - - /* Get the buffer query states */ - void *buffer_state = Q::get_buffer_query_state(&buffer, parms); - - /* Get the shard query states */ - std::vector<std::pair<ShardID, Shard*>> shards; - std::vector<void *> states = vers->get_query_states(shards, parms); + /* + * combine the results of the local queries, also translating + * from LocalResultType to ResultType + */ + QueryType::combine(query_results, parms, output); - std::vector<R> results; - Q::process_query_states(parms, states, buffer_state); + /* optionally repeat the local queries if necessary */ + } while (QueryType::repeat(parms, output, local_queries, buffer_query)); - do { - std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); - for (size_t i=0; i<query_results.size(); i++) { - std::vector<Wrapped<R>> local_results; - ShardID shid; + /* return the output vector to caller via the future */ + args->result_set.set_value(std::move(output)); - if (i == 0) { /* process the buffer first */ - local_results = Q::buffer_query(buffer_state, parms); - shid = INVALID_SHID; - } else { - local_results = Q::query(shards[i - 1].second, states[i - 1], parms); - shid = shards[i - 1].first; - } + /* officially end the query job, releasing the pin on the epoch */ + args->extension->end_job(epoch); - query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); - - if constexpr (Q::EARLY_ABORT) { - if (query_results[i].size() > 0) break; - } - } - Q::merge(query_results, parms, results); - - } while (Q::repeat(parms, results, states, buffer_state)); - - args->result_set.set_value(std::move(results)); - - ((DynamicExtension *) args->extension)->end_job(epoch); - - Q::delete_buffer_query_state(buffer_state); - for (size_t i=0; i<states.size(); i++) { - Q::delete_query_state(states[i]); - } - - delete args; + /* clean up memory allocated for temporary query objects */ + delete buffer_query; + for (size_t i = 0; i < local_queries.size(); i++) { + delete local_queries[i]; } - void schedule_reconstruction() { - auto epoch = create_new_epoch(); - /* - * the reconstruction process calls end_job(), - * so we must start one before calling it - */ - - ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>(); - args->epoch = epoch; - args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_high_watermark()); - args->extension = this; - args->compaction = false; - /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ - - m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); + delete args; + } + + void schedule_reconstruction() { + auto epoch = create_new_epoch(); + + ReconstructionArgs<ShardType, QueryType, L> *args = + new ReconstructionArgs<ShardType, QueryType, L>(); + args->epoch = epoch; + args->merges = epoch->get_structure()->get_reconstruction_tasks( + m_buffer->get_high_watermark()); + args->extension = this; + args->compaction = false; + /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed + * here */ + + m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); + } + + std::future<std::vector<QueryResult>> + schedule_query(Parameters &&query_parms) { + auto args = + new QueryArgs<ShardType, QueryType, DynamicExtension>(); + args->extension = this; + args->query_parms = std::move(query_parms); + auto result = args->result_set.get_future(); + + m_sched.schedule_job(async_query, 0, (void *)args, QUERY); + + return result; + } + + int internal_append(const RecordType &rec, bool ts) { + if (m_buffer->is_at_low_watermark()) { + auto old = false; + + if (m_reconstruction_scheduled.compare_exchange_strong(old, true)) { + schedule_reconstruction(); + } } - std::future<std::vector<R>> schedule_query(void *query_parms) { - QueryArgs<R, S, Q, L> *args = new QueryArgs<R, S, Q, L>(); - args->extension = this; - args->query_parms = query_parms; - auto result = args->result_set.get_future(); + /* this will fail if the HWM is reached and return 0 */ + return m_buffer->append(rec, ts); + } - m_sched.schedule_job(async_query, 0, args, QUERY); - - return result; +#ifdef _GNU_SOURCE + void SetThreadAffinity() { + if constexpr (std::same_as<SchedType, SerialScheduler>) { + return; } - int internal_append(const R &rec, bool ts) { - if (m_buffer->is_at_low_watermark()) { - auto old = false; - - if (m_reconstruction_scheduled.compare_exchange_strong(old, true)) { - schedule_reconstruction(); - } - } - - /* this will fail if the HWM is reached and return 0 */ - return m_buffer->append(rec, ts); + int core = m_next_core.fetch_add(1) % m_core_cnt; + cpu_set_t mask; + CPU_ZERO(&mask); + + switch (core % 2) { + case 0: + // 0 |-> 0 + // 2 |-> 2 + // 4 |-> 4 + core = core + 0; + break; + case 1: + // 1 |-> 28 + // 3 |-> 30 + // 5 |-> 32 + core = (core - 1) + m_core_cnt; + break; } + CPU_SET(core, &mask); + ::sched_setaffinity(0, sizeof(mask), &mask); + } +#else + void SetThreadAffinity() {} +#endif - static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) { - if constexpr (Q::SKIP_DELETE_FILTER) { - return std::move(records); - } - - std::vector<Wrapped<R>> processed_records; - processed_records.reserve(records.size()); + void end_job(_Epoch *epoch) { + epoch_ptr old, new_ptr; - /* - * For delete tagging, we just need to check the delete bit - * on each record. + do { + if (m_previous_epoch.load().epoch == epoch) { + old = m_previous_epoch; + /* + * This could happen if we get into the system during a + * transition. In this case, we can just back out and retry */ - if constexpr (D == DeletePolicy::TAGGING) { - for (auto &rec : records) { - if (rec.is_deleted()) { - continue; - } + if (old.epoch == nullptr) { + continue; + } - processed_records.emplace_back(rec); - } + assert(old.refcnt > 0); - return processed_records; + new_ptr = {old.epoch, old.refcnt - 1}; + if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) { + break; } - + } else { + old = m_current_epoch; /* - * For tombstone deletes, we need to search for the corresponding - * tombstone for each record. + * This could happen if we get into the system during a + * transition. In this case, we can just back out and retry */ - for (auto &rec : records) { - if (rec.is_tombstone()) { - continue; - } - - // FIXME: need to figure out how best to re-enable the buffer tombstone - // check in the correct manner. - //if (buffview.check_tombstone(rec.rec)) { - //continue; - //} - - for (size_t i=0; i<bview->get_record_count(); i++) { - if (bview->get(i)->is_tombstone() && bview->get(i)->rec == rec.rec) { - continue; - } - } - - if (shid != INVALID_SHID) { - for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { - if (vers->get_levels()[lvl]->check_tombstone(0, rec.rec)) { - continue; - } - } - - if (vers->get_levels()[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) { - continue; - } - } - - processed_records.emplace_back(rec); - } - - return processed_records; - } - -#ifdef _GNU_SOURCE - void SetThreadAffinity() { - if constexpr (std::same_as<SCHED, SerialScheduler>) { - return; + if (old.epoch == nullptr) { + continue; } - int core = m_next_core.fetch_add(1) % m_core_cnt; - cpu_set_t mask; - CPU_ZERO(&mask); + assert(old.refcnt > 0); - switch (core % 2) { - case 0: - // 0 |-> 0 - // 2 |-> 2 - // 4 |-> 4 - core = core; - break; - case 1: - // 1 |-> 28 - // 3 |-> 30 - // 5 |-> 32 - core = (core - 1) + m_core_cnt; + new_ptr = {old.epoch, old.refcnt - 1}; + if (m_current_epoch.compare_exchange_strong(old, new_ptr)) { break; } - CPU_SET(core, &mask); - ::sched_setaffinity(0, sizeof(mask), &mask); - } -#else - void SetThreadAffinity() { - - } -#endif - - - void end_job(_Epoch *epoch) { - epoch_ptr old, new_ptr; - - do { - if (m_previous_epoch.load().epoch == epoch) { - old = m_previous_epoch; - /* - * This could happen if we get into the system during a - * transition. In this case, we can just back out and retry - */ - if (old.epoch == nullptr) { - continue; - } - - assert(old.refcnt > 0); - - new_ptr = {old.epoch, old.refcnt - 1}; - if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } else { - old = m_current_epoch; - /* - * This could happen if we get into the system during a - * transition. In this case, we can just back out and retry - */ - if (old.epoch == nullptr) { - continue; - } - - assert(old.refcnt > 0); - - new_ptr = {old.epoch, old.refcnt - 1}; - if (m_current_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } - } while (true); - } - + } + } while (true); + } }; -} - +} // namespace de |