From 9fe305c7d28e993e55c55427f377ae7e3251ea4f Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Fri, 6 Dec 2024 13:13:51 -0500 Subject: Interface update (#5) * Query Interface Adjustments/Refactoring Began the process of adjusting the query interface (and also the shard interface, to a lesser degree) to better accommodate the user. In particular the following changes have been made, 1. The number of necessary template arguments for the query type has been drastically reduced, while also removing the void pointers and manual delete functions from the interface. This was accomplished by requiring many of the sub-types associated with a query (parameters, etc.) to be nested inside the main query class, and by forcing the SHARD type to expose its associated record type. 2. User-defined query return types are now supported. Queries no longer are required to return strictly sets of records. Instead, the query now has LocalResultType and ResultType template parameters (which can be defaulted using a typedef in the Query type itself), allowing much more flexibility. Note that, at least for the short term, the LocalResultType must still expose the same is_deleted/is_tombstone interface as a Wrapped used to, as this is currently needed for delete filtering. A better approach to this is, hopefully, forthcoming. 3. Updated the ISAMTree.h shard and rangequery.h query to use the new interfaces, and adjusted the associated unit tests as well. 4. Dropped the unnecessary "get_data()" function from the ShardInterface concept. 5. Dropped the need to specify a record type in the ShardInterface concept. This is now handled using a required Shard::RECORD member of the Shard class itself, which should expose the name of the record type. * Updates to framework to support new Query/Shard interfaces Pretty extensive adjustments to the framework, particularly to the templates themselves, along with some type-renaming work, to support the new query and shard interfaces. Adjusted the external query interface to take an rvalue reference, rather than a pointer, to the query parameters. * Removed framework-level delete filtering This was causing some issues with the new query interface, and should probably be reworked anyway, so I'm temporarily (TM) removing the feature. * Updated benchmarks + remaining code for new interface --- include/framework/DynamicExtension.h | 1324 +++++++++++----------- include/framework/interface/Query.h | 136 ++- include/framework/interface/Record.h | 335 +++--- include/framework/interface/Scheduler.h | 15 +- include/framework/interface/Shard.h | 66 +- include/framework/scheduling/Epoch.h | 209 ++-- include/framework/scheduling/FIFOScheduler.h | 143 ++- include/framework/scheduling/SerialScheduler.h | 64 +- include/framework/scheduling/Task.h | 101 +- include/framework/scheduling/statistics.h | 140 ++- include/framework/structure/BufferView.h | 258 ++--- include/framework/structure/ExtensionStructure.h | 1080 +++++++++--------- include/framework/structure/InternalLevel.h | 439 +++---- include/framework/structure/MutableBuffer.h | 464 ++++---- include/framework/util/Configuration.h | 35 +- include/query/irs.h | 360 +++--- include/query/knn.h | 224 ++-- include/query/pointlookup.h | 170 ++- include/query/rangecount.h | 259 +++-- include/query/rangequery.h | 283 ++--- include/query/wirs.h | 251 ---- include/query/wss.h | 282 +++-- include/shard/Alias.h | 10 +- include/shard/AugBTree.h | 311 ----- include/shard/FSTrie.h | 4 +- include/shard/ISAMTree.h | 402 ++++--- include/shard/PGM.h | 4 +- include/shard/TrieSpline.h | 4 +- include/shard/VPTree.h | 11 +- include/util/Cursor.h | 89 +- include/util/SortedMerge.h | 274 ++--- include/util/bf_config.h | 16 +- include/util/types.h | 119 +- 33 files changed, 3676 insertions(+), 4206 deletions(-) delete mode 100644 include/query/wirs.h delete mode 100644 include/shard/AugBTree.h (limited to 'include') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index e2e2784..16cbb0e 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -1,8 +1,8 @@ /* * include/framework/DynamicExtension.h * - * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie + * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Dong Xie * * Distributed under the Modified BSD License. * @@ -14,766 +14,782 @@ #include #include "framework/interface/Scheduler.h" -#include "framework/scheduling/FIFOScheduler.h" #include "framework/scheduling/SerialScheduler.h" -#include "framework/structure/MutableBuffer.h" -#include "framework/interface/Record.h" #include "framework/structure/ExtensionStructure.h" +#include "framework/structure/MutableBuffer.h" -#include "framework/util/Configuration.h" #include "framework/scheduling/Epoch.h" +#include "framework/util/Configuration.h" namespace de { -template S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, - DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=FIFOScheduler> +template QueryType, + LayoutPolicy L = LayoutPolicy::TEIRING, + DeletePolicy D = DeletePolicy::TAGGING, + SchedulerInterface SchedType = SerialScheduler> class DynamicExtension { - typedef S Shard; - typedef MutableBuffer Buffer; - typedef ExtensionStructure Structure; - typedef Epoch _Epoch; - typedef BufferView BufView; - - static constexpr size_t QUERY = 1; - static constexpr size_t RECONSTRUCTION = 2; - - struct epoch_ptr { - _Epoch *epoch; - size_t refcnt; - }; - + /* for unit testing purposes */ public: - DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0, - size_t thread_cnt=16) - : m_scale_factor(scale_factor) - , m_max_delete_prop(1) - , m_sched(memory_budget, thread_cnt) - , m_buffer(new Buffer(buffer_lwm, buffer_hwm)) - , m_core_cnt(thread_cnt) - , m_next_core(0) - , m_epoch_cnt(0) - { - if constexpr (L == LayoutPolicy::BSM) { - assert(scale_factor == 2); - } - - auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); - m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); - m_previous_epoch.store({nullptr, 0}); - m_next_epoch.store({nullptr, 0}); - } - - ~DynamicExtension() { - - /* let any in-flight epoch transition finish */ - await_next_epoch(); - - /* shutdown the scheduler */ - m_sched.shutdown(); - - /* delete all held resources */ - delete m_next_epoch.load().epoch; - delete m_current_epoch.load().epoch; - delete m_previous_epoch.load().epoch; - - delete m_buffer; - } - - /* - * Insert the record `rec` into the index. If the buffer is full and - * the framework is blocking on an epoch transition, this call may fail - * and return 0. In this case, retry the call again later. If - * successful, 1 will be returned. The record will be immediately - * visible in the buffer upon the successful return of this function. - */ - int insert(const R &rec) { - return internal_append(rec, false); - } - - /* - * Erase the record `rec` from the index. It is assumed that `rec` - * currently exists--no special checks are made for correctness here. - * The behavior if this function will differ depending on if tombstone - * or tagged deletes are used. - * - * Tombstone deletes - inserts a tombstone record for `rec`. This *may* - * return 0 and fail if the buffer is full and the framework is - * blocking on an epoch transition. In this case, repeat the call - * later. 1 will be returned when the tombstone is successfully - * inserted. - * - * Tagging deletes - Does a point lookup for the record across the - * entire structure, and sets its delete bit when found. Returns 1 if - * the record is found and marked, and 0 if it was not (i.e., if it - * isn't present in the index). - */ - int erase(const R &rec) { - // FIXME: delete tagging will require a lot of extra work to get - // operating "correctly" in a concurrent environment. - - /* - * Get a view on the buffer *first*. This will ensure a stronger - * ordering than simply accessing the buffer directly, but is - * not *strictly* necessary. - */ - if constexpr (D == DeletePolicy::TAGGING) { - static_assert(std::same_as, "Tagging is only supported in single-threaded operation"); - - auto view = m_buffer->get_buffer_view(); - - auto epoch = get_active_epoch(); - if (epoch->get_structure()->tagged_delete(rec)) { - end_job(epoch); - return 1; - } - - end_job(epoch); - - /* - * the buffer will take the longest amount of time, and - * probably has the lowest probability of having the record, - * so we'll check it last. - */ - return view.delete_record(rec); - } + LayoutPolicy Layout = L; - /* - * If tagging isn't used, then delete using a tombstone - */ - return internal_append(rec, true); - } - - /* - * Execute the query with parameters `parms` and return a future. This - * future can be used to access a vector containing the results of the - * query. - * - * The behavior of this function is undefined if `parms` is not a - * pointer to a valid query parameter object for the query type used as - * a template parameter to construct the framework. - */ - std::future> query(void *parms) { - return schedule_query(parms); - } - - /* - * Returns the number of records (included tagged records and - * tombstones) currently within the framework. - */ - size_t get_record_count() { - auto epoch = get_active_epoch(); - auto t = epoch->get_buffer().get_record_count() + epoch->get_structure()->get_record_count(); - end_job(epoch); +private: + /* convenience typedefs for commonly used types within the class */ + typedef typename ShardType::RECORD RecordType; + typedef MutableBuffer Buffer; + typedef ExtensionStructure Structure; + typedef Epoch _Epoch; + typedef BufferView BufView; + + typedef typename QueryType::Parameters Parameters; + typedef typename QueryType::LocalQuery LocalQuery; + typedef typename QueryType::LocalQueryBuffer BufferQuery; + typedef typename QueryType::LocalResultType LocalResult; + typedef typename QueryType::ResultType QueryResult; + + + static constexpr size_t QUERY = 1; + static constexpr size_t RECONSTRUCTION = 2; + + struct epoch_ptr { + _Epoch *epoch; + size_t refcnt; + }; - return t; +public: + /** + * Create a new Dynamized version of a data structure, supporting + * inserts and, possibly, deletes. The following parameters are used + * for configuration of the structure, + * @param buffer_low_watermark The number of records that can be + * inserted before a buffer flush is initiated + * + * @param buffer_high_watermark The maximum buffer capacity, inserts + * will begin to fail once this number is reached, until the + * buffer flush has completed. Has no effect in single-threaded + * operation + * + * @param scale_factor The rate at which the capacity of levels + * grows; should be at least 2 for reasonable performance + * + * @param memory_budget Unused at this time + * + * @param thread_cnt The maximum number of threads available to the + * framework's scheduler for use in answering queries and + * performing compactions and flushes, etc. + */ + DynamicExtension(size_t buffer_low_watermark, size_t buffer_high_watermark, + size_t scale_factor, size_t memory_budget = 0, + size_t thread_cnt = 16) + : m_scale_factor(scale_factor), m_max_delete_prop(1), + m_sched(memory_budget, thread_cnt), + m_buffer(new Buffer(buffer_low_watermark, buffer_high_watermark)), + m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0) { + if constexpr (L == LayoutPolicy::BSM) { + assert(scale_factor == 2); } - /* - * Returns the number of tombstone records currently within the - * framework. This function can be called when tagged deletes are used, - * but will always return 0 in that case. - */ - size_t get_tombstone_count() { - auto epoch = get_active_epoch(); - auto t = epoch->get_buffer().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); - end_job(epoch); - - return t; - } + auto vers = + new Structure(buffer_high_watermark, m_scale_factor, m_max_delete_prop); + m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); + m_previous_epoch.store({nullptr, 0}); + m_next_epoch.store({nullptr, 0}); + } + + /** + * Destructor for DynamicExtension. Will block until the completion of + * any outstanding epoch transition, shut down the scheduler, and free + * all currently allocated shards, buffers, etc., by calling their + * destructors. + */ + ~DynamicExtension() { + + /* let any in-flight epoch transition finish */ + await_next_epoch(); + + /* shutdown the scheduler */ + m_sched.shutdown(); + + /* delete all held resources */ + delete m_next_epoch.load().epoch; + delete m_current_epoch.load().epoch; + delete m_previous_epoch.load().epoch; + + delete m_buffer; + } + + /** + * Inserts a record into the index. Returns 1 if the insert succeeds, + * and 0 if it fails. Inserts may fail if the DynamicExtension's buffer + * has reached the high water mark; in this case, the insert should be + * retried when the buffer has flushed. The record will be immediately + * visible inside the index upon the return of this function. + * + * @param rec The record to be inserted + * + * @return 1 on success, 0 on failure (in which case the insert should + * be retried) + */ + int insert(const RecordType &rec) { return internal_append(rec, false); } + + /** + * Erases a record from the index, according to the DeletePolicy + * template parameter. Returns 1 on success and 0 on failure. The + * equality comparison operator of RecordType is used to identify + * the record to be deleted. + * + * Deletes behave differently, depending on the DeletionPolicy. For + * Tombstone deletes, a tombstone record will be inserted into the + * index. The presence of the deleted record is not checked first, so + * deleting a record that does not exist will result in an unnecessary + * tombstone record being written. + * + * Deletes using Tagging will perform a point lookup for the record to + * be removed, and mark it as deleted in its header. + * + * @param rec The record to be deleted. The argument to this function + * should compare equal to the record to be deleted. + * + * @return 1 on success, and 0 on failure. For tombstone deletes, a + * failure will occur if the insert fails due to the buffer + * being full, and can be retried. For tagging deletes, a + * failure means that hte record to be deleted could not be + * found in the index, and should *not* be retried. + */ + int erase(const RecordType &rec) { + // FIXME: delete tagging will require a lot of extra work to get + // operating "correctly" in a concurrent environment. /* - * Get the number of levels within the framework. This count will - * include any empty levels, but will not include the buffer. Note that - * this is *not* the same as the number of shards when tiering is used, - * as each level can contain multiple shards in that case. + * Get a view on the buffer *first*. This will ensure a stronger + * ordering than simply accessing the buffer directly, but is + * not *strictly* necessary. */ - size_t get_height() { - auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->get_height(); - end_job(epoch); + if constexpr (D == DeletePolicy::TAGGING) { + static_assert(std::same_as, + "Tagging is only supported in single-threaded operation"); - return t; - } + auto view = m_buffer->get_buffer_view(); - /* - * Get the number of bytes of memory allocated across the framework for - * storing records and associated index information (i.e., internal - * ISAM tree nodes). This includes memory that is allocated but - * currently unused in the buffer, or in shards themselves - * (overallocation due to delete cancellation, etc.). - */ - size_t get_memory_usage() { - auto epoch = get_active_epoch(); - auto t = m_buffer->get_memory_usage() + epoch->get_structure()->get_memory_usage(); + auto epoch = get_active_epoch(); + if (epoch->get_structure()->tagged_delete(rec)) { end_job(epoch); + return 1; + } - return t; - } - - /* - * Get the number of bytes of memory allocated across the framework for - * auxiliary structures. This can include bloom filters, aux - * hashtables, etc. - */ - size_t get_aux_memory_usage() { - auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->get_aux_memory_usage(); - end_job(epoch); + end_job(epoch); - return t; + /* + * the buffer will take the longest amount of time, and + * probably has the lowest probability of having the record, + * so we'll check it last. + */ + return view.delete_record(rec); } /* - * Returns the maximum physical capacity of the buffer, measured in - * records. + * If tagging isn't used, then delete using a tombstone */ - size_t get_buffer_capacity() { - return m_buffer->get_capacity(); + return internal_append(rec, true); + } + + /** + * Schedule the execution of a query with specified parameters and + * returns a future that can be used to access the results. The query + * is executed asynchronously. + * @param parms An rvalue reference to the query parameters. + * + * @return A future, from which the query results can be retrieved upon + * query completion + */ + std::future> + query(Parameters &&parms) { + return schedule_query(std::move(parms)); + } + + /** + * Determine the number of records (including tagged records and + * tombstones) currently within the framework. This number is used for + * determining when and how reconstructions occur. + * + * @return The number of records within the index + */ + size_t get_record_count() { + auto epoch = get_active_epoch(); + auto t = epoch->get_buffer().get_record_count() + + epoch->get_structure()->get_record_count(); + end_job(epoch); + + return t; + } + + /** + * Returns the number of tombstone records currently within the + * index. This function can be called when tagged deletes are used, + * but will always return 0 in that case. + * + * @return The number of tombstone records within the index + */ + size_t get_tombstone_count() { + auto epoch = get_active_epoch(); + auto t = epoch->get_buffer().get_tombstone_count() + + epoch->get_structure()->get_tombstone_count(); + end_job(epoch); + + return t; + } + + /** + * Get the number of levels within the framework. This count will + * include any empty levels, but will not include the buffer. Note that + * this is *not* the same as the number of shards when tiering is used, + * as each level can contain multiple shards in that case. + * + * @return The number of levels within the index + */ + size_t get_height() { + auto epoch = get_active_epoch(); + auto t = epoch->get_structure()->get_height(); + end_job(epoch); + + return t; + } + + /** + * Get the number of bytes of memory allocated across the framework for + * storing records and associated index information (i.e., internal + * ISAM tree nodes). This includes memory that is allocated but + * currently unused in the buffer, or in shards themselves + * (overallocation due to delete cancellation, etc.). + * + * @return The number of bytes of memory used for shards (as reported by + * ShardType::get_memory_usage) and the buffer by the index. + */ + size_t get_memory_usage() { + auto epoch = get_active_epoch(); + auto t = m_buffer->get_memory_usage() + + epoch->get_structure()->get_memory_usage(); + end_job(epoch); + + return t; + } + + /** + * Get the number of bytes of memory allocated across the framework for + * auxiliary structures. This can include bloom filters, aux + * hashtables, etc. + * + * @return The number of bytes of memory used for auxilliary structures + * (as reported by ShardType::get_aux_memory_usage) by the index. + */ + size_t get_aux_memory_usage() { + auto epoch = get_active_epoch(); + auto t = epoch->get_structure()->get_aux_memory_usage(); + end_job(epoch); + + return t; + } + + /** + * Create a new single Shard object containing all of the records + * within the framework (buffer and shards). + * + * @param await_reconstruction_completion Specifies whether the currently + * active state of the index should be used to create the shard + * (false), or if shard construction should wait for any active + * reconstructions to finish first (true). Default value of false. + * + * @return A new shard object, containing a copy of all records within + * the index. Ownership of this object is transfered to the + * caller. + */ + ShardType * + create_static_structure(bool await_reconstruction_completion = false) { + if (await_reconstruction_completion) { + await_next_epoch(); } - - /* - * Create a new single Shard object containing all of the records - * within the framework (buffer and shards). The optional parameter can - * be used to specify whether the Shard should be constructed with the - * currently active state of the framework (false), or if shard - * construction should wait until any ongoing reconstructions have - * finished and use that new version (true). - */ - Shard *create_static_structure(bool await_reconstruction_completion=false) { - if (await_reconstruction_completion) { - await_next_epoch(); - } - auto epoch = get_active_epoch(); - auto vers = epoch->get_structure(); - std::vector shards; + auto epoch = get_active_epoch(); + auto vers = epoch->get_structure(); + std::vector shards; - - if (vers->get_levels().size() > 0) { - for (int i=vers->get_levels().size() - 1; i>= 0; i--) { - if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) { - shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); - } - } - } - - /* - * construct a shard from the buffer view. We'll hold the view - * for as short a time as possible: once the records are exfiltrated - * from the buffer, there's no reason to retain a hold on the view's - * head pointer any longer - */ - { - auto bv = epoch->get_buffer(); - if (bv.get_record_count() > 0) { - shards.emplace_back(new S(std::move(bv))); - } - } - - Shard *flattened = new S(shards); - - for (auto shard : shards) { - delete shard; + if (vers->get_levels().size() > 0) { + for (int i = vers->get_levels().size() - 1; i >= 0; i--) { + if (vers->get_levels()[i] && + vers->get_levels()[i]->get_record_count() > 0) { + shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); } - - end_job(epoch); - return flattened; + } } /* - * If the current epoch is *not* the newest one, then wait for - * the newest one to become available. Otherwise, returns immediately. + * construct a shard from the buffer view. We'll hold the view + * for as short a time as possible: once the records are exfiltrated + * from the buffer, there's no reason to retain a hold on the view's + * head pointer any longer */ - void await_next_epoch() { - while (m_next_epoch.load().epoch != nullptr) { - std::unique_lock lk(m_epoch_cv_lk); - m_epoch_cv.wait(lk); - } + { + auto bv = epoch->get_buffer(); + if (bv.get_record_count() > 0) { + shards.emplace_back(new ShardType(std::move(bv))); + } } - /* - * Mostly exposed for unit-testing purposes. Verifies that the current - * active version of the ExtensionStructure doesn't violate the maximum - * tombstone proportion invariant. - */ - bool validate_tombstone_proportion() { - auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->validate_tombstone_proportion(); - end_job(epoch); - return t; - } + ShardType *flattened = new ShardType(shards); + for (auto shard : shards) { + delete shard; + } - void print_scheduler_statistics() { - m_sched.print_statistics(); + end_job(epoch); + return flattened; + } + + /* + * If the current epoch is *not* the newest one, then wait for + * the newest one to become available. Otherwise, returns immediately. + */ + void await_next_epoch() { + while (m_next_epoch.load().epoch != nullptr) { + std::unique_lock lk(m_epoch_cv_lk); + m_epoch_cv.wait(lk); } + } + + /** + * Verify that the currently active version of the index does not + * violate tombstone proportion invariants. Exposed for unit-testing + * purposes. + * + * @return Returns true if the tombstone proportion invariant is + * satisfied, and false if it is not. + */ + bool validate_tombstone_proportion() { + auto epoch = get_active_epoch(); + auto t = epoch->get_structure()->validate_tombstone_proportion(); + end_job(epoch); + return t; + } + + /** + * Calls SchedType::print_statistics, which should write a report of + * scheduler performance statistics to stdout. + */ + void print_scheduler_statistics() const { m_sched.print_statistics(); } private: - SCHED m_sched; - - Buffer *m_buffer; + size_t m_scale_factor; + double m_max_delete_prop; - //std::mutex m_struct_lock; - //std::set m_versions; + SchedType m_sched; + Buffer *m_buffer; - alignas(64) std::atomic m_reconstruction_scheduled; + size_t m_core_cnt; + std::atomic m_next_core; + std::atomic m_epoch_cnt; + + alignas(64) std::atomic m_reconstruction_scheduled; - std::atomic m_next_epoch; - std::atomic m_current_epoch; - std::atomic m_previous_epoch; + std::atomic m_next_epoch; + std::atomic m_current_epoch; + std::atomic m_previous_epoch; - std::condition_variable m_epoch_cv; - std::mutex m_epoch_cv_lk; + std::condition_variable m_epoch_cv; + std::mutex m_epoch_cv_lk; - std::atomic m_epoch_cnt; - size_t m_scale_factor; - double m_max_delete_prop; - std::atomic m_next_core; - size_t m_core_cnt; - void enforce_delete_invariant(_Epoch *epoch) { - auto structure = epoch->get_structure(); - auto compactions = structure->get_compaction_tasks(); + void enforce_delete_invariant(_Epoch *epoch) { + auto structure = epoch->get_structure(); + auto compactions = structure->get_compaction_tasks(); - while (compactions.size() > 0) { + while (compactions.size() > 0) { - /* schedule a compaction */ - ReconstructionArgs *args = new ReconstructionArgs(); - args->epoch = epoch; - args->merges = compactions; - args->extension = this; - args->compaction = true; - /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ + /* schedule a compaction */ + ReconstructionArgs *args = + new ReconstructionArgs(); + args->epoch = epoch; + args->merges = compactions; + args->extension = this; + args->compaction = true; + /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed + * here */ - auto wait = args->result.get_future(); + auto wait = args->result.get_future(); - m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); + m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); - /* wait for compaction completion */ - wait.get(); - - /* get a new batch of compactions to perform, if needed */ - compactions = structure->get_compaction_tasks(); - } - } + /* wait for compaction completion */ + wait.get(); - _Epoch *get_active_epoch() { - epoch_ptr old, new_ptr; - - do { - /* - * during an epoch transition, a nullptr will installed in the - * current_epoch. At this moment, the "new" current epoch will - * soon be installed, but the "current" current epoch has been - * moved back to m_previous_epoch. - */ - if (m_current_epoch.load().epoch == nullptr) { - old = m_previous_epoch; - new_ptr = {old.epoch, old.refcnt+1}; - if (old.epoch != nullptr && m_previous_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } else { - old = m_current_epoch; - new_ptr = {old.epoch, old.refcnt+1}; - if (old.epoch != nullptr && m_current_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } - } while (true); - - assert(new_ptr.refcnt > 0); - - return new_ptr.epoch; + /* get a new batch of compactions to perform, if needed */ + compactions = structure->get_compaction_tasks(); } + } + + _Epoch *get_active_epoch() { + epoch_ptr old, new_ptr; + + do { + /* + * during an epoch transition, a nullptr will installed in the + * current_epoch. At this moment, the "new" current epoch will + * soon be installed, but the "current" current epoch has been + * moved back to m_previous_epoch. + */ + if (m_current_epoch.load().epoch == nullptr) { + old = m_previous_epoch; + new_ptr = {old.epoch, old.refcnt + 1}; + if (old.epoch != nullptr && + m_previous_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } else { + old = m_current_epoch; + new_ptr = {old.epoch, old.refcnt + 1}; + if (old.epoch != nullptr && + m_current_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } + } while (true); - void advance_epoch(size_t buffer_head) { + assert(new_ptr.refcnt > 0); - retire_epoch(m_previous_epoch.load().epoch); + return new_ptr.epoch; + } - epoch_ptr tmp = {nullptr, 0}; - epoch_ptr cur; - do { - cur = m_current_epoch; - } while(!m_current_epoch.compare_exchange_strong(cur, tmp)); + void advance_epoch(size_t buffer_head) { - m_previous_epoch.store(cur); + retire_epoch(m_previous_epoch.load().epoch); - // FIXME: this may currently block because there isn't any - // query preemption yet. At this point, we'd need to either - // 1) wait for all queries on the old_head to finish - // 2) kill all queries on the old_head - // 3) somehow migrate all queries on the old_head to the new - // version - while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) { - _mm_pause(); - } + epoch_ptr tmp = {nullptr, 0}; + epoch_ptr cur; + do { + cur = m_current_epoch; + } while (!m_current_epoch.compare_exchange_strong(cur, tmp)); + m_previous_epoch.store(cur); - m_current_epoch.store(m_next_epoch); - m_next_epoch.store({nullptr, 0}); + // FIXME: this may currently block because there isn't any + // query preemption yet. At this point, we'd need to either + // 1) wait for all queries on the old_head to finish + // 2) kill all queries on the old_head + // 3) somehow migrate all queries on the old_head to the new + // version + while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) { + _mm_pause(); + } + m_current_epoch.store(m_next_epoch); + m_next_epoch.store({nullptr, 0}); - /* notify any blocking threads that the new epoch is available */ - m_epoch_cv_lk.lock(); - m_epoch_cv.notify_all(); - m_epoch_cv_lk.unlock(); - } + /* notify any blocking threads that the new epoch is available */ + m_epoch_cv_lk.lock(); + m_epoch_cv.notify_all(); + m_epoch_cv_lk.unlock(); + } + /* + * Creates a new epoch by copying the currently active one. The new epoch's + * structure will be a shallow copy of the old one's. + */ + _Epoch *create_new_epoch() { /* - * Creates a new epoch by copying the currently active one. The new epoch's - * structure will be a shallow copy of the old one's. + * This epoch access is _not_ protected under the assumption that + * only one reconstruction will be able to trigger at a time. If that + * condition is violated, it is possible that this code will clone a retired + * epoch. */ - _Epoch *create_new_epoch() { - /* - * This epoch access is _not_ protected under the assumption that - * only one reconstruction will be able to trigger at a time. If that condition - * is violated, it is possible that this code will clone a retired - * epoch. - */ - assert(m_next_epoch.load().epoch == nullptr); - auto current_epoch = get_active_epoch(); + assert(m_next_epoch.load().epoch == nullptr); + auto current_epoch = get_active_epoch(); - m_epoch_cnt.fetch_add(1); - m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0}); + m_epoch_cnt.fetch_add(1); + m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0}); - end_job(current_epoch); + end_job(current_epoch); - return m_next_epoch.load().epoch; - } + return m_next_epoch.load().epoch; + } - void retire_epoch(_Epoch *epoch) { - /* - * Epochs with currently active jobs cannot - * be retired. By the time retire_epoch is called, - * it is assumed that a new epoch is active, meaning - * that the epoch to be retired should no longer - * accumulate new active jobs. Eventually, this - * number will hit zero and the function will - * proceed. - */ - - if (epoch == nullptr) { - return; - } + void retire_epoch(_Epoch *epoch) { + /* + * Epochs with currently active jobs cannot + * be retired. By the time retire_epoch is called, + * it is assumed that a new epoch is active, meaning + * that the epoch to be retired should no longer + * accumulate new active jobs. Eventually, this + * number will hit zero and the function will + * proceed. + */ - epoch_ptr old, new_ptr; - new_ptr = {nullptr, 0}; - do { - old = m_previous_epoch.load(); - - /* - * If running in single threaded mode, the failure to retire - * an Epoch will result in the thread of execution blocking - * indefinitely. - */ - if constexpr (std::same_as) { - if (old.epoch == epoch) assert(old.refcnt == 0); - } - - if (old.epoch == epoch && old.refcnt == 0 && - m_previous_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - usleep(1); - - } while(true); - - delete epoch; + if (epoch == nullptr) { + return; } - static void reconstruction(void *arguments) { - auto args = (ReconstructionArgs *) arguments; + epoch_ptr old, new_ptr; + new_ptr = {nullptr, 0}; + do { + old = m_previous_epoch.load(); + + /* + * If running in single threaded mode, the failure to retire + * an Epoch will result in the thread of execution blocking + * indefinitely. + */ + if constexpr (std::same_as) { + if (old.epoch == epoch) + assert(old.refcnt == 0); + } + + if (old.epoch == epoch && old.refcnt == 0 && + m_previous_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + usleep(1); + + } while (true); + + delete epoch; + } + + static void reconstruction(void *arguments) { + auto args = (ReconstructionArgs *)arguments; + + ((DynamicExtension *)args->extension)->SetThreadAffinity(); + Structure *vers = args->epoch->get_structure(); + + if constexpr (L == LayoutPolicy::BSM) { + if (args->merges.size() > 0) { + vers->reconstruction(args->merges[0]); + } + } else { + for (ssize_t i = 0; i < args->merges.size(); i++) { + vers->reconstruction(args->merges[i].target, + args->merges[i].sources[0]); + } + } - ((DynamicExtension *) args->extension)->SetThreadAffinity(); - Structure *vers = args->epoch->get_structure(); + /* + * we'll grab the buffer AFTER doing the internal reconstruction, so we + * can flush as many records as possible in one go. The reconstruction + * was done so as to make room for the full buffer anyway, so there's + * no real benefit to doing this first. + */ + auto buffer_view = args->epoch->get_buffer(); + size_t new_head = buffer_view.get_tail(); - if constexpr (L == LayoutPolicy::BSM) { - if (args->merges.size() > 0) { - vers->reconstruction(args->merges[0]); - } - } else { - for (ssize_t i=0; imerges.size(); i++) { - vers->reconstruction(args->merges[i].target, args->merges[i].sources[0]); - } - } + /* + * if performing a compaction, don't flush the buffer, as + * there is no guarantee that any necessary reconstructions + * will free sufficient space in L0 to support a flush + */ + if (!args->compaction) { + vers->flush_buffer(std::move(buffer_view)); + } + args->result.set_value(true); - /* - * we'll grab the buffer AFTER doing the internal reconstruction, so we - * can flush as many records as possible in one go. The reconstruction - * was done so as to make room for the full buffer anyway, so there's - * no real benefit to doing this first. - */ - auto buffer_view = args->epoch->get_buffer(); - size_t new_head = buffer_view.get_tail(); + /* + * Compactions occur on an epoch _before_ it becomes active, + * and as a result the active epoch should _not_ be advanced as + * part of a compaction + */ + if (!args->compaction) { + ((DynamicExtension *)args->extension)->advance_epoch(new_head); + } - /* - * if performing a compaction, don't flush the buffer, as - * there is no guarantee that any necessary reconstructions - * will free sufficient space in L0 to support a flush - */ - if (!args->compaction) { - vers->flush_buffer(std::move(buffer_view)); + ((DynamicExtension *)args->extension) + ->m_reconstruction_scheduled.store(false); + + delete args; + } + + static void async_query(void *arguments) { + auto *args = + (QueryArgs *) arguments; + + auto epoch = args->extension->get_active_epoch(); + + auto buffer = epoch->get_buffer(); + auto vers = epoch->get_structure(); + auto *parms = &(args->query_parms); + + /* create initial buffer query */ + auto buffer_query = QueryType::local_preproc_buffer(&buffer, parms); + + /* create initial local queries */ + std::vector> shards; + std::vector local_queries = + vers->get_local_queries(shards, parms); + + /* process local/buffer queries to create the final version */ + QueryType::distribute_query(parms, local_queries, buffer_query); + + /* execute the local/buffer queries and combine the results into output */ + std::vector output; + do { + std::vector> + query_results(shards.size() + 1); + for (size_t i = 0; i < query_results.size(); i++) { + std::vector local_results; + ShardID shid; + + if (i == 0) { /* execute buffer query */ + local_results = QueryType::local_query_buffer(buffer_query); + shid = INVALID_SHID; + } else { /*execute local queries */ + local_results = QueryType::local_query(shards[i - 1].second, + local_queries[i - 1]); + shid = shards[i - 1].first; } - args->result.set_value(true); + /* framework-level, automatic delete filtering */ + query_results[i] = std::move(local_results); - /* - * Compactions occur on an epoch _before_ it becomes active, - * and as a result the active epoch should _not_ be advanced as - * part of a compaction - */ - if (!args->compaction) { - ((DynamicExtension *) args->extension)->advance_epoch(new_head); + /* end query early if EARLY_ABORT is set and a result exists */ + if constexpr (QueryType::EARLY_ABORT) { + if (query_results[i].size() > 0) + break; } + } - ((DynamicExtension *) args->extension)->m_reconstruction_scheduled.store(false); - - delete args; - } - - static void async_query(void *arguments) { - QueryArgs *args = (QueryArgs *) arguments; - - auto epoch = ((DynamicExtension *) args->extension)->get_active_epoch(); - - auto ptr1 = ((DynamicExtension *) args->extension)->m_previous_epoch.load().epoch; - auto ptr2 = ((DynamicExtension *) args->extension)->m_current_epoch.load().epoch; - auto ptr3 = ((DynamicExtension *) args->extension)->m_next_epoch.load().epoch; - - - auto buffer = epoch->get_buffer(); - auto vers = epoch->get_structure(); - void *parms = args->query_parms; - - /* Get the buffer query states */ - void *buffer_state = Q::get_buffer_query_state(&buffer, parms); - - /* Get the shard query states */ - std::vector> shards; - std::vector states = vers->get_query_states(shards, parms); + /* + * combine the results of the local queries, also translating + * from LocalResultType to ResultType + */ + QueryType::combine(query_results, parms, output); - std::vector results; - Q::process_query_states(parms, states, buffer_state); + /* optionally repeat the local queries if necessary */ + } while (QueryType::repeat(parms, output, local_queries, buffer_query)); - do { - std::vector>> query_results(shards.size() + 1); - for (size_t i=0; i> local_results; - ShardID shid; + /* return the output vector to caller via the future */ + args->result_set.set_value(std::move(output)); - if (i == 0) { /* process the buffer first */ - local_results = Q::buffer_query(buffer_state, parms); - shid = INVALID_SHID; - } else { - local_results = Q::query(shards[i - 1].second, states[i - 1], parms); - shid = shards[i - 1].first; - } + /* officially end the query job, releasing the pin on the epoch */ + args->extension->end_job(epoch); - query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); - - if constexpr (Q::EARLY_ABORT) { - if (query_results[i].size() > 0) break; - } - } - Q::merge(query_results, parms, results); - - } while (Q::repeat(parms, results, states, buffer_state)); - - args->result_set.set_value(std::move(results)); - - ((DynamicExtension *) args->extension)->end_job(epoch); - - Q::delete_buffer_query_state(buffer_state); - for (size_t i=0; i *args = new ReconstructionArgs(); - args->epoch = epoch; - args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_high_watermark()); - args->extension = this; - args->compaction = false; - /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ - - m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); + delete args; + } + + void schedule_reconstruction() { + auto epoch = create_new_epoch(); + + ReconstructionArgs *args = + new ReconstructionArgs(); + args->epoch = epoch; + args->merges = epoch->get_structure()->get_reconstruction_tasks( + m_buffer->get_high_watermark()); + args->extension = this; + args->compaction = false; + /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed + * here */ + + m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); + } + + std::future> + schedule_query(Parameters &&query_parms) { + auto args = + new QueryArgs(); + args->extension = this; + args->query_parms = std::move(query_parms); + auto result = args->result_set.get_future(); + + m_sched.schedule_job(async_query, 0, (void *)args, QUERY); + + return result; + } + + int internal_append(const RecordType &rec, bool ts) { + if (m_buffer->is_at_low_watermark()) { + auto old = false; + + if (m_reconstruction_scheduled.compare_exchange_strong(old, true)) { + schedule_reconstruction(); + } } - std::future> schedule_query(void *query_parms) { - QueryArgs *args = new QueryArgs(); - args->extension = this; - args->query_parms = query_parms; - auto result = args->result_set.get_future(); + /* this will fail if the HWM is reached and return 0 */ + return m_buffer->append(rec, ts); + } - m_sched.schedule_job(async_query, 0, args, QUERY); - - return result; +#ifdef _GNU_SOURCE + void SetThreadAffinity() { + if constexpr (std::same_as) { + return; } - int internal_append(const R &rec, bool ts) { - if (m_buffer->is_at_low_watermark()) { - auto old = false; - - if (m_reconstruction_scheduled.compare_exchange_strong(old, true)) { - schedule_reconstruction(); - } - } - - /* this will fail if the HWM is reached and return 0 */ - return m_buffer->append(rec, ts); + int core = m_next_core.fetch_add(1) % m_core_cnt; + cpu_set_t mask; + CPU_ZERO(&mask); + + switch (core % 2) { + case 0: + // 0 |-> 0 + // 2 |-> 2 + // 4 |-> 4 + core = core + 0; + break; + case 1: + // 1 |-> 28 + // 3 |-> 30 + // 5 |-> 32 + core = (core - 1) + m_core_cnt; + break; } + CPU_SET(core, &mask); + ::sched_setaffinity(0, sizeof(mask), &mask); + } +#else + void SetThreadAffinity() {} +#endif - static std::vector> filter_deletes(std::vector> &records, ShardID shid, Structure *vers, BufView *bview) { - if constexpr (Q::SKIP_DELETE_FILTER) { - return std::move(records); - } - - std::vector> processed_records; - processed_records.reserve(records.size()); + void end_job(_Epoch *epoch) { + epoch_ptr old, new_ptr; - /* - * For delete tagging, we just need to check the delete bit - * on each record. + do { + if (m_previous_epoch.load().epoch == epoch) { + old = m_previous_epoch; + /* + * This could happen if we get into the system during a + * transition. In this case, we can just back out and retry */ - if constexpr (D == DeletePolicy::TAGGING) { - for (auto &rec : records) { - if (rec.is_deleted()) { - continue; - } + if (old.epoch == nullptr) { + continue; + } - processed_records.emplace_back(rec); - } + assert(old.refcnt > 0); - return processed_records; + new_ptr = {old.epoch, old.refcnt - 1}; + if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) { + break; } - + } else { + old = m_current_epoch; /* - * For tombstone deletes, we need to search for the corresponding - * tombstone for each record. + * This could happen if we get into the system during a + * transition. In this case, we can just back out and retry */ - for (auto &rec : records) { - if (rec.is_tombstone()) { - continue; - } - - // FIXME: need to figure out how best to re-enable the buffer tombstone - // check in the correct manner. - //if (buffview.check_tombstone(rec.rec)) { - //continue; - //} - - for (size_t i=0; iget_record_count(); i++) { - if (bview->get(i)->is_tombstone() && bview->get(i)->rec == rec.rec) { - continue; - } - } - - if (shid != INVALID_SHID) { - for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { - if (vers->get_levels()[lvl]->check_tombstone(0, rec.rec)) { - continue; - } - } - - if (vers->get_levels()[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) { - continue; - } - } - - processed_records.emplace_back(rec); - } - - return processed_records; - } - -#ifdef _GNU_SOURCE - void SetThreadAffinity() { - if constexpr (std::same_as) { - return; + if (old.epoch == nullptr) { + continue; } - int core = m_next_core.fetch_add(1) % m_core_cnt; - cpu_set_t mask; - CPU_ZERO(&mask); + assert(old.refcnt > 0); - switch (core % 2) { - case 0: - // 0 |-> 0 - // 2 |-> 2 - // 4 |-> 4 - core = core; - break; - case 1: - // 1 |-> 28 - // 3 |-> 30 - // 5 |-> 32 - core = (core - 1) + m_core_cnt; + new_ptr = {old.epoch, old.refcnt - 1}; + if (m_current_epoch.compare_exchange_strong(old, new_ptr)) { break; } - CPU_SET(core, &mask); - ::sched_setaffinity(0, sizeof(mask), &mask); - } -#else - void SetThreadAffinity() { - - } -#endif - - - void end_job(_Epoch *epoch) { - epoch_ptr old, new_ptr; - - do { - if (m_previous_epoch.load().epoch == epoch) { - old = m_previous_epoch; - /* - * This could happen if we get into the system during a - * transition. In this case, we can just back out and retry - */ - if (old.epoch == nullptr) { - continue; - } - - assert(old.refcnt > 0); - - new_ptr = {old.epoch, old.refcnt - 1}; - if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } else { - old = m_current_epoch; - /* - * This could happen if we get into the system during a - * transition. In this case, we can just back out and retry - */ - if (old.epoch == nullptr) { - continue; - } - - assert(old.refcnt > 0); - - new_ptr = {old.epoch, old.refcnt - 1}; - if (m_current_epoch.compare_exchange_strong(old, new_ptr)) { - break; - } - } - } while (true); - } - + } + } while (true); + } }; -} - +} // namespace de diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h index 577d6cd..1b64646 100644 --- a/include/framework/interface/Query.h +++ b/include/framework/interface/Query.h @@ -1,7 +1,7 @@ /* * include/framework/interface/Query.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -10,23 +10,127 @@ #include "framework/QueryRequirements.h" -namespace de{ +namespace de { -template -concept QueryInterface = requires(void *p, S *sh, std::vector &s, std::vector>> &rv, BufferView *bv, std::vector &resv) { - {Q::get_query_state(sh, p)} -> std::convertible_to; - {Q::get_buffer_query_state(bv, p)} -> std::convertible_to; - {Q::process_query_states(p, s, p)}; - {Q::query(sh, p, p)} -> std::convertible_to>>; - {Q::buffer_query(p, p)} -> std::convertible_to>>; - {Q::merge(rv, p, resv)}; +/* + * FIXME: It would probably be best to absorb the std::vector into + * this type too; this would allow user-defined collections for + * intermediate results, which could allow for more merging + * optimizations. However, this would require an alternative + * approach to doing delete checks, so we'll leave it for now. + */ +template +concept LocalResultInterface = requires(R res) { + { res.is_deleted() } -> std::convertible_to; + { res.is_tombstone() } -> std::convertible_to; +}; + +/* + * + * + */ +template +concept QueryInterface = LocalResultInterface && + requires(PARAMETERS *parameters, LOCAL *local, LOCAL_BUFFER *buffer_query, + SHARD *shard, std::vector &local_queries, + std::vector> &local_results, + std::vector &result, + BufferView *bv) { + + /* + * Given a set of query parameters and a shard, return a local query + * object for that shard. + */ + { QUERY::local_preproc(shard, parameters) } -> std::convertible_to; + + /* + * Given a set of query parameters and a buffer view, return a local + * query object for the buffer. + * NOTE: for interface reasons, the pointer to the buffer view MUST be + * stored inside of the local query object. The future buffer + * query routine will access the buffer by way of this pointer. + */ + { + QUERY::local_preproc_buffer(bv, parameters) + } -> std::convertible_to; + + /* + * Given a full set of local queries, and the buffer query, make any + * necessary adjustments to the local queries in-place, to account for + * global information. If no additional processing is required, this + * function can be left empty. + */ + {QUERY::distribute_query(parameters, local_queries, buffer_query)}; + + /* + * Answer the local query, defined by `local` against `shard` and return + * a vector of LOCAL_RESULT objects defining the query result. + */ + { + QUERY::local_query(shard, local) + } -> std::convertible_to>; + + /* + * Answer the local query defined by `local` against the buffer (which + * should be accessed by a pointer inside of `local`) and return a vector + * of LOCAL_RESULT objects defining the query result. + */ + { + QUERY::local_query_buffer(buffer_query) + } -> std::convertible_to>; + + /* + * Process the local results from the buffer and all of the shards, + * stored in `local_results`, and insert the associated ResultType + * objects into the `result` vector, which represents the final result + * of the query. Updates to this vector are done in-place. + */ + {QUERY::combine(local_results, parameters, result)}; - {Q::delete_query_state(p)} -> std::same_as; - {Q::delete_buffer_query_state(p)} -> std::same_as; + /* + * Process the post-combine `result` vector of ResultType objects, + * in the context of the global and local query parameters, to determine + * if the query should be repeated. If so, make any necessary adjustments + * to the local query objects and return True. Otherwise, return False. + * + * If no repetition is needed for a given problem type, simply return + * False immediately and the query will end. + */ + { + QUERY::repeat(parameters, result, local_queries, buffer_query) + } -> std::same_as; - {Q::repeat(p, resv, s, p)} -> std::same_as; + /* + * If this flag is True, then the query will immediately stop and return + * a result as soon as the first non-deleted LocalRecordType is found. + * Otherwise, every Shard and the buffer will be queried and the results + * merged, like normal. + * + * This is largely an optimization flag for use with point-lookup, or + * other single-record result queries + */ + { QUERY::EARLY_ABORT } -> std::convertible_to; - {Q::EARLY_ABORT} -> std::convertible_to; - {Q::SKIP_DELETE_FILTER} -> std::convertible_to; + /* + * If false, the built-in delete filtering that the framework can + * apply to the local results, prior to calling combine, will be skipped. + * This general filtering can be inefficient, particularly for tombstone + * -based deletes, and so if a more efficient manual filtering can be + * performed, it is worth setting this to True and doing that filtering + * in the combine step. + * + * If deletes are not a consideration for your problem, it's also best + * to turn this off, as it'll avoid the framework making an extra pass + * over the local results prior to combining them. + * + * TODO: Temporarily disabling this, as we've dropped framework-level + * delete filtering for the time being. + */ + /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to; */ }; -} +} // namespace de diff --git a/include/framework/interface/Record.h b/include/framework/interface/Record.h index 19ccadd..d3e77d8 100644 --- a/include/framework/interface/Record.h +++ b/include/framework/interface/Record.h @@ -1,272 +1,247 @@ /* * include/framework/interface/Record.h * - * Copyright (C) 2023 Douglas Rumbaugh + * Copyright (C) 2023-2024 Douglas Rumbaugh * * Distributed under the Modified BSD License. * - * FIXME: the record implementations could probably be broken out into + * FIXME: the record implementations could probably be broken out into * different files, leaving only the interface here */ #pragma once -#include -#include #include +#include +#include #include "psu-util/hash.h" namespace de { -template +template concept RecordInterface = requires(R r, R s) { - { r < s } ->std::convertible_to; - { r == s } ->std::convertible_to; + { r < s } -> std::convertible_to; + { r == s } -> std::convertible_to; }; -template +template concept WeightedRecordInterface = requires(R r) { - {r.weight} -> std::convertible_to; + { r.weight } -> std::convertible_to; }; -template +template concept NDRecordInterface = RecordInterface && requires(R r, R s) { - {r.calc_distance(s)} -> std::convertible_to; + { r.calc_distance(s) } -> std::convertible_to; }; template concept KVPInterface = RecordInterface && requires(R r) { - r.key; - r.value; + r.key; + r.value; }; -template +template concept AlexInterface = KVPInterface && requires(R r) { - {r.key} -> std::convertible_to; - {r.value} -> std::convertible_to; + { r.key } -> std::convertible_to; + { r.value } -> std::convertible_to; }; -template -concept WrappedInterface = RecordInterface && requires(R r, R s, bool b, int i) { - {r.header} -> std::convertible_to; - r.rec; - {r.set_delete()}; - {r.is_deleted()} -> std::convertible_to; - {r.set_tombstone(b)}; - {r.is_tombstone()} -> std::convertible_to; - {r.set_timestamp(i)}; - {r.get_timestamp()} -> std::convertible_to; - {r.clear_timestamp()}; - {r.is_visible()} -> std::convertible_to; - {r.set_visible()}; - {r < s} -> std::convertible_to; - {r == s} ->std::convertible_to; +template +concept WrappedInterface = RecordInterface && + requires(R r, R s, bool b, int i) { + { r.header } -> std::convertible_to; + r.rec; + {r.set_delete()}; + { r.is_deleted() } -> std::convertible_to; + {r.set_tombstone(b)}; + { r.is_tombstone() } -> std::convertible_to; + {r.set_timestamp(i)}; + { r.get_timestamp() } -> std::convertible_to; + {r.clear_timestamp()}; + { r.is_visible() } -> std::convertible_to; + {r.set_visible()}; + { r < s } -> std::convertible_to; + { r == s } -> std::convertible_to; }; -template -struct Wrapped { - uint32_t header; - R rec; +template struct Wrapped { + uint32_t header; + R rec; - inline void set_delete() { - header |= 2; - } + inline void set_delete() { header |= 2; } - inline bool is_deleted() const { - return header & 2; - } + inline bool is_deleted() const { return header & 2; } - inline void set_visible() { - header |= 4; - } + inline void set_visible() { header |= 4; } - inline bool is_visible() const { - return header & 4; - } + inline bool is_visible() const { return header & 4; } - inline void set_timestamp(int ts) { - header |= (ts << 3); - } - - inline int get_timestamp() const { - return header >> 3; - } + inline void set_timestamp(int ts) { header |= (ts << 3); } - inline void clear_timestamp() { - header &= 7; - } + inline int get_timestamp() const { return header >> 3; } - inline void set_tombstone(bool val=true) { - if (val) { - header |= 1; - } else { - header &= 0; - } - } + inline void clear_timestamp() { header &= 7; } - inline bool is_tombstone() const { - return header & 1; + inline void set_tombstone(bool val = true) { + if (val) { + header |= 1; + } else { + header &= 0; } + } - inline bool operator<(const Wrapped& other) const { - return rec < other.rec || (rec == other.rec && header < other.header); - } + inline bool is_tombstone() const { return header & 1; } - inline bool operator==(const Wrapped& other) const { - return rec == other.rec; - } + inline bool operator<(const Wrapped &other) const { + return rec < other.rec || (rec == other.rec && header < other.header); + } + inline bool operator==(const Wrapped &other) const { + return rec == other.rec; + } }; -template -struct Record { - K key; - V value; +template struct Record { + K key; + V value; - inline bool operator<(const Record& other) const { - return key < other.key || (key == other.key && value < other.value); - } + inline bool operator<(const Record &other) const { + return key < other.key || (key == other.key && value < other.value); + } - inline bool operator==(const Record& other) const { - return key == other.key && value == other.value; - } + inline bool operator==(const Record &other) const { + return key == other.key && value == other.value; + } }; -template -struct Record { - const char* key; - V value; - size_t len; +template struct Record { + const char *key; + V value; + size_t len; - inline bool operator<(const Record& other) const { - size_t n = std::min(len, other.len) + 1; - return strncmp(key, other.key, n) < 0; - } + inline bool operator<(const Record &other) const { + size_t n = std::min(len, other.len) + 1; + return strncmp(key, other.key, n) < 0; + } - inline bool operator==(const Record& other) const { - size_t n = std::min(len, other.len) + 1; - return strncmp(key, other.key, n) == 0; - } + inline bool operator==(const Record &other) const { + size_t n = std::min(len, other.len) + 1; + return strncmp(key, other.key, n) == 0; + } }; -template -struct WeightedRecord { - K key; - V value; - W weight = 1; +template struct WeightedRecord { + K key; + V value; + W weight = 1; - inline bool operator==(const WeightedRecord& other) const { - return key == other.key && value == other.value; - } + inline bool operator==(const WeightedRecord &other) const { + return key == other.key && value == other.value; + } - inline bool operator<(const WeightedRecord& other) const { - return key < other.key || (key == other.key && value < other.value); - } + inline bool operator<(const WeightedRecord &other) const { + return key < other.key || (key == other.key && value < other.value); + } }; +template struct CosinePoint { + V data[D]; -template -struct CosinePoint{ - V data[D]; - - inline bool operator==(const CosinePoint& other) const { - for (size_t i=0; i other.data[i]) { - return false; - } - } + return true; + } + /* lexicographic order */ + inline bool operator<(const CosinePoint &other) const { + for (size_t i = 0; i < D; i++) { + if (data[i] < other.data[i]) { + return true; + } else if (data[i] > other.data[i]) { return false; + } } - inline double calc_distance(const CosinePoint& other) const { + return false; + } - double prod = 0; - double asquared = 0; - double bsquared = 0; + inline double calc_distance(const CosinePoint &other) const { - for (size_t i=0; i struct EuclidPoint { + V data[D]; -template -struct EuclidPoint{ - V data[D]; + inline bool operator==(const EuclidPoint &other) const { + for (size_t i = 0; i < D; i++) { + if (data[i] != other.data[i]) { + return false; + } + } - inline bool operator==(const EuclidPoint& other) const { - for (size_t i=0; i other.data[i]) { + return false; + } } - /* lexicographic order */ - inline bool operator<(const EuclidPoint& other) const { - for (size_t i=0; i other.data[i]) { - return false; - } - } + return false; + } - return false; + inline double calc_distance(const EuclidPoint &other) const { + double dist = 0; + for (size_t i = 0; i < D; i++) { + dist += (data[i] - other.data[i]) * (data[i] - other.data[i]); } - inline double calc_distance(const EuclidPoint& other) const { - double dist = 0; - for (size_t i=0; i -struct RecordHash { - size_t operator()(R const &rec) const { - return psudb::hash_bytes((std::byte *) &rec, sizeof(R)); - } +template struct RecordHash { + size_t operator()(R const &rec) const { + return psudb::hash_bytes((std::byte *)&rec, sizeof(R)); + } }; -template -class DistCmpMax { +template class DistCmpMax { public: - DistCmpMax(R *baseline) : P(baseline) {} + DistCmpMax(R *baseline) : P(baseline) {} - inline bool operator()(const R *a, const R *b) requires WrappedInterface { - return a->rec.calc_distance(P->rec) > b->rec.calc_distance(P->rec); - } + inline bool operator()(const R *a, const R *b) requires WrappedInterface { + return a->rec.calc_distance(P->rec) > b->rec.calc_distance(P->rec); + } - inline bool operator()(const R *a, const R *b) requires (!WrappedInterface){ - return a->calc_distance(*P) > b->calc_distance(*P); - } + inline bool operator()(const R *a, + const R *b) requires(!WrappedInterface) { + return a->calc_distance(*P) > b->calc_distance(*P); + } private: - R *P; + R *P; }; -} +} // namespace de diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h index 451ddd2..d76a6c8 100644 --- a/include/framework/interface/Scheduler.h +++ b/include/framework/interface/Scheduler.h @@ -1,7 +1,7 @@ /* * include/framework/interface/Scheduler.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -10,10 +10,11 @@ #include "framework/scheduling/Task.h" -template -concept SchedulerInterface = requires(S s, size_t i, void *vp, de::Job j) { - {S(i, i)}; - {s.schedule_job(j, i, vp, i)} -> std::convertible_to; - {s.shutdown()}; - {s.print_statistics()}; +template +concept SchedulerInterface = requires(SchedType s, size_t i, void *vp, + de::Job j) { + {SchedType(i, i)}; + {s.schedule_job(j, i, vp, i)} -> std::convertible_to; + {s.shutdown()}; + {s.print_statistics()}; }; diff --git a/include/framework/interface/Shard.h b/include/framework/interface/Shard.h index c4a9180..bd980c0 100644 --- a/include/framework/interface/Shard.h +++ b/include/framework/interface/Shard.h @@ -1,7 +1,7 @@ /* * include/framework/interface/Shard.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -12,25 +12,57 @@ namespace de { -template -concept ShardInterface = RecordInterface && requires(S s, std::vector spp, void *p, bool b, size_t i, BufferView bv, R r) { - {S(spp)}; - {S(std::move(bv))}; +template +concept ShardInterface = RecordInterface && + requires(SHARD shard, const std::vector &shard_vector, bool b, + BufferView bv, + typename SHARD::RECORD rec) { + /* construct a shard from a vector of shards of the same type */ + {SHARD(shard_vector)}; - {s.point_lookup(r, b) } -> std::same_as*>; - {s.get_data()} -> std::same_as*>; + /* construct a shard from a buffer view (i.e., unsorted array of records) */ + {SHARD(std::move(bv))}; + + /* perform a lookup for a record matching rec and return a pointer to it */ + { + shard.point_lookup(rec, b) + } -> std::same_as *>; + + /* + * return the number of records in the shard -- used to determine when + * reconstructions occur + */ + { shard.get_record_count() } -> std::convertible_to; + + /* + * return the number of tombstones in the shard -- can simply return + * 0 if tombstones are not in use. + */ + { shard.get_tombstone_count() } -> std::convertible_to; + + /* + * return the number of bytes of memory used by the main data structure + * within the shard -- informational use only at the moment + */ + { shard.get_memory_usage() } -> std::convertible_to; + + /* + * return the number of bytes of memory used by auxilliary data + * structures (bloom filters, etc.) within the shard -- informational + * use only at the moment + */ + { shard.get_aux_memory_usage() } -> std::convertible_to; - {s.get_record_count()} -> std::convertible_to; - {s.get_tombstone_count()} -> std::convertible_to; - {s.get_memory_usage()} -> std::convertible_to; - {s.get_aux_memory_usage()} -> std::convertible_to; }; -template -concept SortedShardInterface = ShardInterface && requires(S s, R r, R *rp, size_t i) { - {s.lower_bound(r)} -> std::convertible_to; - {s.upper_bound(r)} -> std::convertible_to; - {s.get_record_at(i)} -> std::same_as*>; +template +concept SortedShardInterface = ShardInterface && + requires(SHARD shard, typename SHARD::RECORD rec, size_t index) { + { shard.lower_bound(rec) } -> std::convertible_to; + { shard.upper_bound(rec) } -> std::convertible_to; + { + shard.get_record_at(index) + } -> std::same_as *>; }; -} +} // namespace de diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 9377fb0..03675b1 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -1,7 +1,7 @@ /* * include/framework/scheduling/Epoch.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -11,133 +11,120 @@ #include #include -#include "framework/structure/MutableBuffer.h" -#include "framework/structure/ExtensionStructure.h" #include "framework/structure/BufferView.h" +#include "framework/structure/ExtensionStructure.h" +#include "framework/structure/MutableBuffer.h" namespace de { - -template S, QueryInterface Q, LayoutPolicy L> +template QueryType, + LayoutPolicy L> class Epoch { private: - typedef MutableBuffer Buffer; - typedef ExtensionStructure Structure; - typedef BufferView BufView; -public: - Epoch(size_t number=0) - : m_buffer(nullptr) - , m_structure(nullptr) - , m_active_merge(false) - , m_epoch_number(number) - , m_buffer_head(0) - {} - - Epoch(size_t number, Structure *structure, Buffer *buff, size_t head) - : m_buffer(buff) - , m_structure(structure) - , m_active_merge(false) - , m_epoch_number(number) - , m_buffer_head(head) - { - structure->take_reference(); - } - - ~Epoch() { - if (m_structure) { - m_structure->release_reference(); - } - - if (m_structure->get_reference_count() == 0) { - delete m_structure; - } + typedef typename ShardType::RECORD RecordType; + typedef MutableBuffer Buffer; + typedef ExtensionStructure Structure; + typedef BufferView BufView; +public: + Epoch(size_t number = 0) + : m_buffer(nullptr), m_structure(nullptr), m_active_merge(false), + m_epoch_number(number), m_buffer_head(0) {} + + Epoch(size_t number, Structure *structure, Buffer *buff, size_t head) + : m_buffer(buff), m_structure(structure), m_active_merge(false), + m_epoch_number(number), m_buffer_head(head) { + structure->take_reference(); + } + + ~Epoch() { + if (m_structure) { + m_structure->release_reference(); } - /* - * Epochs are *not* copyable or movable. Only one can exist, and all users - * of it work with pointers - */ - Epoch(const Epoch&) = delete; - Epoch(Epoch&&) = delete; - Epoch &operator=(const Epoch&) = delete; - Epoch &operator=(Epoch&&) = delete; - - size_t get_epoch_number() { - return m_epoch_number; + if (m_structure->get_reference_count() == 0) { + delete m_structure; } - - Structure *get_structure() { - return m_structure; + } + + /* + * Epochs are *not* copyable or movable. Only one can exist, and all users + * of it work with pointers + */ + Epoch(const Epoch &) = delete; + Epoch(Epoch &&) = delete; + Epoch &operator=(const Epoch &) = delete; + Epoch &operator=(Epoch &&) = delete; + + size_t get_epoch_number() { return m_epoch_number; } + + Structure *get_structure() { return m_structure; } + + BufView get_buffer() { return m_buffer->get_buffer_view(m_buffer_head); } + + /* + * Returns a new Epoch object that is a copy of this one. The new object + * will also contain a copy of the m_structure, rather than a reference to + * the same one. The epoch number of the new epoch will be set to the + * provided argument. + */ + Epoch *clone(size_t number) { + std::unique_lock m_buffer_lock; + auto epoch = new Epoch(number); + epoch->m_buffer = m_buffer; + epoch->m_buffer_head = m_buffer_head; + + if (m_structure) { + epoch->m_structure = m_structure->copy(); + /* the copy routine returns a structure with 0 references */ + epoch->m_structure->take_reference(); } - BufView get_buffer() { - return m_buffer->get_buffer_view(m_buffer_head); + return epoch; + } + + /* + * Check if a merge can be started from this Epoch. At present, without + * concurrent merging, this simply checks if there is currently a scheduled + * merge based on this Epoch. If there is, returns false. If there isn't, + * return true and set a flag indicating that there is an active merge. + */ + bool prepare_reconstruction() { + auto old = m_active_merge.load(); + if (old) { + return false; } - /* - * Returns a new Epoch object that is a copy of this one. The new object - * will also contain a copy of the m_structure, rather than a reference to - * the same one. The epoch number of the new epoch will be set to the - * provided argument. - */ - Epoch *clone(size_t number) { - std::unique_lock m_buffer_lock; - auto epoch = new Epoch(number); - epoch->m_buffer = m_buffer; - epoch->m_buffer_head = m_buffer_head; - - if (m_structure) { - epoch->m_structure = m_structure->copy(); - /* the copy routine returns a structure with 0 references */ - epoch->m_structure->take_reference(); - } - - return epoch; + // FIXME: this needs cleaned up + while (!m_active_merge.compare_exchange_strong(old, true)) { + old = m_active_merge.load(); + if (old) { + return false; + } } - /* - * Check if a merge can be started from this Epoch. At present, without - * concurrent merging, this simply checks if there is currently a scheduled - * merge based on this Epoch. If there is, returns false. If there isn't, - * return true and set a flag indicating that there is an active merge. - */ - bool prepare_reconstruction() { - auto old = m_active_merge.load(); - if (old) { - return false; - } - - // FIXME: this needs cleaned up - while (!m_active_merge.compare_exchange_strong(old, true)) { - old = m_active_merge.load(); - if (old) { - return false; - } - } - - return true; - } + return true; + } - bool advance_buffer_head(size_t head) { - m_buffer_head = head; - return m_buffer->advance_head(m_buffer_head); - } + bool advance_buffer_head(size_t head) { + m_buffer_head = head; + return m_buffer->advance_head(m_buffer_head); + } private: - Structure *m_structure; - Buffer *m_buffer; - - std::mutex m_buffer_lock; - std::atomic m_active_merge; - - /* - * The number of currently active jobs - * (queries/merges) operating on this - * epoch. An epoch can only be retired - * when this number is 0. - */ - size_t m_epoch_number; - size_t m_buffer_head; + Buffer *m_buffer; + Structure *m_structure; + + std::mutex m_buffer_lock; + std::atomic m_active_merge; + + /* + * The number of currently active jobs + * (queries/merges) operating on this + * epoch. An epoch can only be retired + * when this number is 0. + */ + size_t m_epoch_number; + size_t m_buffer_head; }; -} +} // namespace de diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 3ed4f49..7cb6d20 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -1,7 +1,7 @@ /* * include/framework/scheduling/FIFOScheduler.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -17,11 +17,11 @@ */ #pragma once -#include -#include -#include #include "framework/scheduling/Task.h" #include "framework/scheduling/statistics.h" +#include +#include +#include #include "ctpl/ctpl.h" #include "psu-ds/LockedPriorityQueue.h" @@ -30,100 +30,95 @@ namespace de { using namespace std::literals::chrono_literals; - class FIFOScheduler { private: - static const size_t DEFAULT_MAX_THREADS = 8; + static const size_t DEFAULT_MAX_THREADS = 8; public: - FIFOScheduler(size_t memory_budget, size_t thread_cnt) - : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS) - , m_used_memory(0) - , m_used_thrds(0) - , m_shutdown(false) - { - m_sched_thrd = std::thread(&FIFOScheduler::run, this); - m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this); - m_thrd_pool.resize(m_thrd_cnt); + FIFOScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX), + m_thrd_cnt((thread_cnt) ? thread_cnt : DEFAULT_MAX_THREADS), + m_used_memory(0), m_used_thrds(0), m_shutdown(false) { + m_sched_thrd = std::thread(&FIFOScheduler::run, this); + m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this); + m_thrd_pool.resize(m_thrd_cnt); + } + + ~FIFOScheduler() { + if (!m_shutdown.load()) { + shutdown(); } - ~FIFOScheduler() { - if (!m_shutdown.load()) { - shutdown(); - } + m_sched_thrd.join(); + m_sched_wakeup_thrd.join(); + } - m_sched_thrd.join(); - m_sched_wakeup_thrd.join(); - } + void schedule_job(std::function job, size_t size, void *args, + size_t type = 0) { + std::unique_lock lk(m_cv_lock); + size_t ts = m_counter.fetch_add(1); - void schedule_job(std::function job, size_t size, void *args, size_t type=0) { - std::unique_lock lk(m_cv_lock); - size_t ts = m_counter.fetch_add(1); + m_stats.job_queued(ts, type, size); + m_task_queue.push(Task(size, ts, job, args, type, &m_stats)); - m_stats.job_queued(ts, type, size); - m_task_queue.push(Task(size, ts, job, args, type, &m_stats)); + m_cv.notify_all(); + } - m_cv.notify_all(); - } - - void shutdown() { - m_shutdown.store(true); - m_thrd_pool.stop(true); - m_cv.notify_all(); - } + void shutdown() { + m_shutdown.store(true); + m_thrd_pool.stop(true); + m_cv.notify_all(); + } - void print_statistics() { - m_stats.print_statistics(); - } + void print_statistics() { m_stats.print_statistics(); } private: - psudb::LockedPriorityQueue m_task_queue; + psudb::LockedPriorityQueue m_task_queue; - size_t m_memory_budget; - size_t m_thrd_cnt; + [[maybe_unused]] size_t m_memory_budget; + size_t m_thrd_cnt; - std::atomic m_shutdown; - std::atomic m_counter; - std::mutex m_cv_lock; - std::condition_variable m_cv; + std::atomic m_counter; + std::mutex m_cv_lock; + std::condition_variable m_cv; - std::thread m_sched_thrd; - std::thread m_sched_wakeup_thrd; - ctpl::thread_pool m_thrd_pool; + std::thread m_sched_thrd; + std::thread m_sched_wakeup_thrd; + ctpl::thread_pool m_thrd_pool; - std::atomic m_used_thrds; - std::atomic m_used_memory; + std::atomic m_used_memory; + std::atomic m_used_thrds; - SchedulerStatistics m_stats; + std::atomic m_shutdown; - void periodic_wakeup() { - do { - std::this_thread::sleep_for(10us); - m_cv.notify_all(); - } while (!m_shutdown.load()); - } + SchedulerStatistics m_stats; - void schedule_next() { - assert(m_task_queue.size() > 0); - auto t = m_task_queue.pop(); - m_stats.job_scheduled(t.m_timestamp); + void periodic_wakeup() { + do { + std::this_thread::sleep_for(10us); + m_cv.notify_all(); + } while (!m_shutdown.load()); + } - m_thrd_pool.push(t); - } + void schedule_next() { + assert(m_task_queue.size() > 0); + auto t = m_task_queue.pop(); + m_stats.job_scheduled(t.m_timestamp); - void run() { - do { - std::unique_lock cv_lock(m_cv_lock); - m_cv.wait(cv_lock); + m_thrd_pool.push(t); + } - while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) { - schedule_next(); - } - } while(!m_shutdown.load()); - } + void run() { + do { + std::unique_lock cv_lock(m_cv_lock); + m_cv.wait(cv_lock); + while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) { + schedule_next(); + } + } while (!m_shutdown.load()); + } }; -} +} // namespace de diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index ac59301..7cd9cfc 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -1,13 +1,13 @@ /* * include/framework/scheduling/SerialScheduler.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * * IMPORTANT: This "scheduler" is a shim implementation for allowing - * strictly serial, single-threaded operation of the framework. It should - * never be used in multi-threaded contexts. A call to the schedule_job + * strictly serial, single-threaded operation of the framework. It should + * never be used in multi-threaded contexts. A call to the schedule_job * function will immediately run the job and block on its completion before * returning. * @@ -21,42 +21,36 @@ namespace de { class SerialScheduler { public: - SerialScheduler(size_t memory_budget, size_t thread_cnt) - : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) - , m_used_memory(0) - , m_used_thrds(0) - , m_counter(0) - {} - - ~SerialScheduler() = default; - - void schedule_job(std::function job, size_t size, void *args, size_t type=0) { - size_t ts = m_counter++; - m_stats.job_queued(ts, type, size); - m_stats.job_scheduled(ts); - auto t = Task(size, ts, job, args, type, &m_stats); - t(0); - } - - void shutdown() { - /* intentionally left blank */ - } - - void print_statistics() { - m_stats.print_statistics(); - } + SerialScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX), + m_thrd_cnt((thread_cnt) ? thread_cnt : UINT64_MAX), m_used_memory(0), + m_used_thrds(0), m_counter(0) {} + + ~SerialScheduler() = default; + + void schedule_job(std::function job, size_t size, void *args, + size_t type = 0) { + size_t ts = m_counter++; + m_stats.job_queued(ts, type, size); + m_stats.job_scheduled(ts); + auto t = Task(size, ts, job, args, type, &m_stats); + t(0); + } + + void shutdown() { /* intentionally left blank */ } + + void print_statistics() { m_stats.print_statistics(); } private: - size_t m_memory_budget; - size_t m_thrd_cnt; + [[maybe_unused]] size_t m_memory_budget; + [[maybe_unused]] size_t m_thrd_cnt; - size_t m_used_thrds; - size_t m_used_memory; + [[maybe_unused]] size_t m_used_memory; + [[maybe_unused]] size_t m_used_thrds; - size_t m_counter; + size_t m_counter; - SchedulerStatistics m_stats; + SchedulerStatistics m_stats; }; -} +} // namespace de diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index bd53090..6b6f040 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -1,7 +1,7 @@ /* * include/framework/scheduling/Task.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -13,77 +13,76 @@ */ #pragma once -#include -#include #include +#include +#include -#include "framework/util/Configuration.h" #include "framework/scheduling/Epoch.h" #include "framework/scheduling/statistics.h" +#include "framework/util/Configuration.h" namespace de { -template S, QueryInterface Q, LayoutPolicy L> +template QueryType, + LayoutPolicy L> struct ReconstructionArgs { - Epoch *epoch; - ReconstructionVector merges; - std::promise result; - bool compaction; - void *extension; + typedef typename ShardType::RECORD RecordType; + Epoch *epoch; + ReconstructionVector merges; + std::promise result; + bool compaction; + void *extension; }; -template S, QueryInterface Q, LayoutPolicy L> -struct QueryArgs { - std::promise> result_set; - void *query_parms; - void *extension; +template Q, typename DE> struct QueryArgs { + std::promise> result_set; + typename Q::Parameters query_parms; + DE *extension; }; -typedef std::function Job; +typedef std::function Job; struct Task { - Task(size_t size, size_t ts, Job job, void *args, size_t type=0, SchedulerStatistics *stats=nullptr) - : m_job(job) - , m_size(size) - , m_timestamp(ts) - , m_args(args) - , m_type(type) - , m_stats(stats) - {} + Task(size_t size, size_t ts, Job job, void *args, size_t type = 0, + SchedulerStatistics *stats = nullptr) + : m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type), + m_stats(stats) {} - Job m_job; - size_t m_size; - size_t m_timestamp; - void *m_args; - size_t m_type; - SchedulerStatistics *m_stats; + Job m_job; + size_t m_size; + size_t m_timestamp; + void *m_args; + size_t m_type; + SchedulerStatistics *m_stats; - friend bool operator<(const Task &self, const Task &other) { - return self.m_timestamp < other.m_timestamp; - } + friend bool operator<(const Task &self, const Task &other) { + return self.m_timestamp < other.m_timestamp; + } - friend bool operator>(const Task &self, const Task &other) { - return self.m_timestamp > other.m_timestamp; - } + friend bool operator>(const Task &self, const Task &other) { + return self.m_timestamp > other.m_timestamp; + } - void operator()(size_t thrd_id) { - auto start = std::chrono::high_resolution_clock::now(); - if (m_stats) { - m_stats->job_begin(m_timestamp); - } + void operator()(size_t thrd_id) { + auto start = std::chrono::high_resolution_clock::now(); + if (m_stats) { + m_stats->job_begin(m_timestamp); + } - m_job(m_args); + m_job(m_args); - if (m_stats) { - m_stats->job_complete(m_timestamp); - } - auto stop = std::chrono::high_resolution_clock::now(); + if (m_stats) { + m_stats->job_complete(m_timestamp); + } + auto stop = std::chrono::high_resolution_clock::now(); - if (m_stats) { - auto time = std::chrono::duration_cast(stop - start).count(); - m_stats->log_time_data(time, m_type); - } + if (m_stats) { + auto time = + std::chrono::duration_cast(stop - start) + .count(); + m_stats->log_time_data(time, m_type); } + } }; -} +} // namespace de diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h index 6c479cd..48c186f 100644 --- a/include/framework/scheduling/statistics.h +++ b/include/framework/scheduling/statistics.h @@ -1,7 +1,7 @@ /* * include/framework/scheduling/statistics.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -13,106 +13,94 @@ */ #pragma once -#include +#include #include +#include +#include +#include #include #include -#include -#include -#include namespace de { class SchedulerStatistics { private: - enum class EventType { - QUEUED, - SCHEDULED, - STARTED, - FINISHED - }; + enum class EventType { QUEUED, SCHEDULED, STARTED, FINISHED }; - struct Event { - size_t id; - EventType type; - }; - - struct JobInfo { - size_t id; - size_t size; - size_t type; - }; + struct Event { + size_t id; + EventType type; + }; + struct JobInfo { + size_t id; + size_t size; + size_t type; + }; public: - SchedulerStatistics() = default; - ~SchedulerStatistics() = default; + SchedulerStatistics() = default; + ~SchedulerStatistics() = default; - void job_queued(size_t id, size_t type, size_t size) { - auto time = std::chrono::high_resolution_clock::now(); - } + void job_queued(size_t id, size_t type, size_t size) { } - void job_scheduled(size_t id) { - std::unique_lock lk(m_mutex); + void job_scheduled(size_t id) { std::unique_lock lk(m_mutex); } - } + void job_begin(size_t id) {} - void job_begin(size_t id) { + void job_complete(size_t id) {} - } + /* FIXME: This is just a temporary approach */ + void log_time_data(size_t length, size_t type) { + assert(type == 1 || type == 2); - void job_complete(size_t id) { + if (type == 1) { + m_type_1_cnt.fetch_add(1); + m_type_1_total_time.fetch_add(length); - } + if (length > m_type_1_largest_time) { + m_type_1_largest_time.store(length); + } + } else { + m_type_2_cnt.fetch_add(1); + m_type_2_total_time.fetch_add(length); - /* FIXME: This is just a temporary approach */ - void log_time_data(size_t length, size_t type) { - assert(type == 1 || type == 2); - - if (type == 1) { - m_type_1_cnt.fetch_add(1); - m_type_1_total_time.fetch_add(length); - - if (length > m_type_1_largest_time) { - m_type_1_largest_time.store(length); - } - } else { - m_type_2_cnt.fetch_add(1); - m_type_2_total_time.fetch_add(length); - - if (length > m_type_2_largest_time) { - m_type_2_largest_time.store(length); - } - } + if (length > m_type_2_largest_time) { + m_type_2_largest_time.store(length); + } } - - void print_statistics() { - if (m_type_1_cnt > 0) { - fprintf(stdout, "Query Count: %ld\tQuery Avg. Latency: %ld\tMax Query Latency: %ld\n", - m_type_1_cnt.load(), - m_type_1_total_time.load() / m_type_1_cnt.load(), - m_type_1_largest_time.load()); - } - if (m_type_2_cnt > 0) { - fprintf(stdout, "Reconstruction Count: %ld\tReconstruction Avg. Latency: %ld\tMax Recon. Latency:%ld\n", - m_type_2_cnt.load(), - m_type_2_total_time.load() / m_type_2_cnt.load(), - m_type_2_largest_time.load()); - } + } + + void print_statistics() { + if (m_type_1_cnt > 0) { + fprintf( + stdout, + "Query Count: %ld\tQuery Avg. Latency: %ld\tMax Query Latency: %ld\n", + m_type_1_cnt.load(), m_type_1_total_time.load() / m_type_1_cnt.load(), + m_type_1_largest_time.load()); + } + if (m_type_2_cnt > 0) { + fprintf(stdout, + "Reconstruction Count: %ld\tReconstruction Avg. Latency: " + "%ld\tMax Recon. Latency:%ld\n", + m_type_2_cnt.load(), + m_type_2_total_time.load() / m_type_2_cnt.load(), + m_type_2_largest_time.load()); } + } private: - std::mutex m_mutex; - std::unordered_map m_jobs; - std::vector m_event_log; + std::mutex m_mutex; + std::unordered_map m_jobs; + std::vector m_event_log; - std::atomic m_type_1_cnt; - std::atomic m_type_1_total_time; + std::atomic m_type_1_cnt; + std::atomic m_type_1_total_time; - std::atomic m_type_2_cnt; - std::atomic m_type_2_total_time; + std::atomic m_type_2_cnt; + std::atomic m_type_2_total_time; - std::atomic m_type_1_largest_time; - std::atomic m_type_2_largest_time; + std::atomic m_type_1_largest_time; + std::atomic m_type_2_largest_time; }; -} +} // namespace de diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index e95a799..acf1201 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -1,7 +1,7 @@ /* * include/framework/structure/BufferView.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -9,166 +9,150 @@ */ #pragma once -#include #include +#include #include #include -#include "psu-util/alignment.h" -#include "psu-ds/BloomFilter.h" #include "framework/interface/Record.h" +#include "psu-ds/BloomFilter.h" +#include "psu-util/alignment.h" namespace de { -typedef std::function ReleaseFunction; +typedef std::function ReleaseFunction; -template -class BufferView { +template class BufferView { public: - BufferView() = default; - - /* - * the BufferView's lifetime is tightly linked to buffer versioning, and so - * copying and assignment are disabled. - */ - BufferView(const BufferView&) = delete; - BufferView &operator=(BufferView &) = delete; - - BufferView(BufferView &&other) - : m_data(std::exchange(other.m_data, nullptr)) - , m_release(std::move(other.m_release)) - , m_head(std::exchange(other.m_head, 0)) - , m_tail(std::exchange(other.m_tail, 0)) - , m_start(std::exchange(other.m_start, 0)) - , m_stop(std::exchange(other.m_stop, 0)) - , m_cap(std::exchange(other.m_cap, 0)) - , m_approx_ts_cnt(std::exchange(other.m_approx_ts_cnt, 0)) - , m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr)) - , m_active(std::exchange(other.m_active, false)) {} - - BufferView &operator=(BufferView &&other) = delete; - - - BufferView(Wrapped *buffer, size_t cap, size_t head, size_t tail, size_t tombstone_cnt, psudb::BloomFilter *filter, - ReleaseFunction release) - : m_data(buffer) - , m_release(release) - , m_head(head) - , m_tail(tail) - , m_start(m_head % cap) - , m_stop(m_tail % cap) - , m_cap(cap) - , m_approx_ts_cnt(tombstone_cnt) - , m_tombstone_filter(filter) - , m_active(true) {} - - ~BufferView() { - if (m_active) { - m_release(); - } + BufferView() = default; + + /* + * the BufferView's lifetime is tightly linked to buffer versioning, so + * copying and assignment are disabled. + */ + BufferView(const BufferView &) = delete; + BufferView &operator=(BufferView &) = delete; + + BufferView(BufferView &&other) + : m_data(std::exchange(other.m_data, nullptr)), + m_release(std::move(other.m_release)), + m_head(std::exchange(other.m_head, 0)), + m_tail(std::exchange(other.m_tail, 0)), + m_start(std::exchange(other.m_start, 0)), + m_stop(std::exchange(other.m_stop, 0)), + m_cap(std::exchange(other.m_cap, 0)), + m_approx_ts_cnt(std::exchange(other.m_approx_ts_cnt, 0)), + m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr)), + m_active(std::exchange(other.m_active, false)) {} + + BufferView &operator=(BufferView &&other) = delete; + + BufferView(Wrapped *buffer, size_t cap, size_t head, size_t tail, + size_t tombstone_cnt, psudb::BloomFilter *filter, + ReleaseFunction release) + : m_data(buffer), m_release(release), m_head(head), m_tail(tail), + m_start(m_head % cap), m_stop(m_tail % cap), m_cap(cap), + m_approx_ts_cnt(tombstone_cnt), m_tombstone_filter(filter), + m_active(true) {} + + ~BufferView() { + if (m_active) { + m_release(); } + } - bool check_tombstone(const R& rec) { - if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; - - for (size_t i=0; ilookup(rec)) + return false; - return false; + for (size_t i = 0; i < get_record_count(); i++) { + if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) { + return true; + } } - bool delete_record(const R& rec) { - if (m_start < m_stop) { - for (size_t i=m_start; i *get(size_t i) { - //assert(i < get_record_count()); - return m_data + to_idx(i); - } - - void copy_to_buffer(psudb::byte *buffer) { - /* check if the region to be copied circles back to start. If so, do it in two steps */ - if (m_start > m_stop) { - size_t split_idx = m_cap - m_start; - - memcpy(buffer, (std::byte*) (m_data + m_start), split_idx* sizeof(Wrapped)); - memcpy(buffer + (split_idx * sizeof(Wrapped)), (std::byte*) m_data, m_stop * sizeof(Wrapped)); - } else { - memcpy(buffer, (std::byte*) (m_data + m_start), get_record_count() * sizeof(Wrapped)); + for (size_t i = 0; i < m_stop; i++) { + if (m_data[i].rec == rec) { + m_data[i].set_delete(); + return true; } + } } - size_t get_tail() { - return m_tail; + return false; + } + + size_t get_record_count() { return m_tail - m_head; } + + size_t get_capacity() { return m_cap; } + + /* + * NOTE: This function returns an upper bound on the number + * of tombstones within the view. There may be less than + * this, due to synchronization issues during view creation. + */ + size_t get_tombstone_count() { return m_approx_ts_cnt; } + + Wrapped *get(size_t i) { + return m_data + to_idx(i); + } + + void copy_to_buffer(psudb::byte *buffer) { + /* check if the region to be copied circles back to start. If so, do it in + * two steps */ + if (m_start > m_stop) { + size_t split_idx = m_cap - m_start; + + memcpy(buffer, (std::byte *)(m_data + m_start), + split_idx * sizeof(Wrapped)); + memcpy(buffer + (split_idx * sizeof(Wrapped)), (std::byte *)m_data, + m_stop * sizeof(Wrapped)); + } else { + memcpy(buffer, (std::byte *)(m_data + m_start), + get_record_count() * sizeof(Wrapped)); } + } - size_t get_head() { - return m_head; - } + size_t get_tail() { return m_tail; } + + size_t get_head() { return m_head; } private: - Wrapped* m_data; - ReleaseFunction m_release; - size_t m_head; - size_t m_tail; - size_t m_start; - size_t m_stop; - size_t m_cap; - size_t m_approx_ts_cnt; - psudb::BloomFilter *m_tombstone_filter; - bool m_active; - - size_t to_idx(size_t i) { - size_t idx = (m_start + i >= m_cap) ? i - (m_cap - m_start) - : m_start + i; - assert(idx < m_cap); - return idx; - } + Wrapped *m_data; + ReleaseFunction m_release; + size_t m_head; + size_t m_tail; + size_t m_start; + size_t m_stop; + size_t m_cap; + size_t m_approx_ts_cnt; + psudb::BloomFilter *m_tombstone_filter; + bool m_active; + + size_t to_idx(size_t i) { + size_t idx = (m_start + i >= m_cap) ? i - (m_cap - m_start) : m_start + i; + assert(idx < m_cap); + return idx; + } }; -} +} // namespace de diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index b83674b..2728246 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -1,8 +1,8 @@ /* * include/framework/structure/ExtensionStructure.h * - * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie + * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Dong Xie * * Distributed under the Modified BSD License. * @@ -22,622 +22,660 @@ namespace de { -template S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING> +template QueryType, + LayoutPolicy L = LayoutPolicy::TEIRING> class ExtensionStructure { - typedef S Shard; - typedef BufferView BuffView; + typedef typename ShardType::RECORD RecordType; + typedef BufferView BuffView; - typedef struct { - size_t reccnt; - size_t reccap; + typedef struct { + size_t reccnt; + size_t reccap; - size_t shardcnt; - size_t shardcap; - } level_state; + size_t shardcnt; + size_t shardcap; + } level_state; - typedef std::vector state_vector; + typedef std::vector state_vector; public: - ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) - : m_scale_factor(scale_factor) - , m_max_delete_prop(max_delete_prop) - , m_buffer_size(buffer_size) - {} - - ~ExtensionStructure() = default; - - /* - * Create a shallow copy of this extension structure. The copy will share - * references to the same levels/shards as the original, but will have its - * own lists. As all of the shards are immutable (with the exception of - * deletes), the copy can be restructured with reconstructions and flushes - * without affecting the original. The copied structure will be returned - * with a reference count of 0; generally you will want to immediately call - * take_reference() on it. - * - * NOTE: When using tagged deletes, a delete of a record in the original - * structure will affect the copy, so long as the copy retains a reference - * to the same shard as the original. This could cause synchronization - * problems under tagging with concurrency. Any deletes in this context will - * need to be forwarded to the appropriate structures manually. - */ - ExtensionStructure *copy() { - auto new_struct = new ExtensionStructure(m_buffer_size, m_scale_factor, - m_max_delete_prop); - for (size_t i=0; im_levels.push_back(m_levels[i]->clone()); - } - - new_struct->m_refcnt = 0; - new_struct->m_current_state = m_current_state; + ExtensionStructure(size_t buffer_size, size_t scale_factor, + double max_delete_prop) + : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop), + m_buffer_size(buffer_size) {} + + ~ExtensionStructure() = default; + + /* + * Create a shallow copy of this extension structure. The copy will share + * references to the same levels/shards as the original, but will have its + * own lists. As all of the shards are immutable (with the exception of + * deletes), the copy can be restructured with reconstructions and flushes + * without affecting the original. The copied structure will be returned + * with a reference count of 0; generally you will want to immediately call + * take_reference() on it. + * + * NOTE: When using tagged deletes, a delete of a record in the original + * structure will affect the copy, so long as the copy retains a reference + * to the same shard as the original. This could cause synchronization + * problems under tagging with concurrency. Any deletes in this context will + * need to be forwarded to the appropriate structures manually. + */ + ExtensionStructure *copy() { + auto new_struct = new ExtensionStructure( + m_buffer_size, m_scale_factor, m_max_delete_prop); + for (size_t i = 0; i < m_levels.size(); i++) { + new_struct->m_levels.push_back(m_levels[i]->clone()); + } - return new_struct; + new_struct->m_refcnt = 0; + new_struct->m_current_state = m_current_state; + + return new_struct; + } + + /* + * Search for a record matching the argument and mark it deleted by + * setting the delete bit in its wrapped header. Returns 1 if a matching + * record was found and deleted, and 0 if a matching record was not found. + * + * This function will stop after finding the first matching record. It is + * assumed that no duplicate records exist. In the case of duplicates, this + * function will still "work", but in the sense of "delete first match". + */ + int tagged_delete(const RecordType &rec) { + for (auto level : m_levels) { + if (level && level->delete_record(rec)) { + return 1; + } } /* - * Search for a record matching the argument and mark it deleted by - * setting the delete bit in its wrapped header. Returns 1 if a matching - * record was found and deleted, and 0 if a matching record was not found. - * - * This function will stop after finding the first matching record. It is - * assumed that no duplicate records exist. In the case of duplicates, this - * function will still "work", but in the sense of "delete first match". + * If the record to be erased wasn't found, return 0. The + * DynamicExtension itself will then search the active + * Buffers. */ - int tagged_delete(const R &rec) { - for (auto level : m_levels) { - if (level && level->delete_record(rec)) { - return 1; - } - } - - /* - * If the record to be erased wasn't found, return 0. The - * DynamicExtension itself will then search the active - * Buffers. - */ - return 0; + return 0; + } + + /* + * Flush a buffer into the extension structure, performing any necessary + * reconstructions to free up room in L0. + * + * FIXME: arguably, this should be a method attached to the buffer that + * takes a structure as input. + */ + inline bool flush_buffer(BuffView buffer) { + state_vector tmp = m_current_state; + + if (tmp.size() == 0) { + grow(tmp); } - /* - * Flush a buffer into the extension structure, performing any necessary - * reconstructions to free up room in L0. - * - * FIXME: arguably, this should be a method attached to the buffer that - * takes a structure as input. - */ - inline bool flush_buffer(BuffView buffer) { - state_vector tmp = m_current_state; + assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); + flush_buffer_into_l0(std::move(buffer)); - if (tmp.size() == 0) { - grow(tmp); - } + return true; + } - assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); - flush_buffer_into_l0(std::move(buffer)); + /* + * Return the total number of records (including tombstones) within all + * of the levels of the structure. + */ + size_t get_record_count() { + size_t cnt = 0; - return true; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) + cnt += m_levels[i]->get_record_count(); } - /* - * Return the total number of records (including tombstones) within all - * of the levels of the structure. - */ - size_t get_record_count() { - size_t cnt = 0; + return cnt; + } - for (size_t i=0; iget_record_count(); - } + /* + * Return the total number of tombstones contained within all of the + * levels of the structure. + */ + size_t get_tombstone_count() { + size_t cnt = 0; - return cnt; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) + cnt += m_levels[i]->get_tombstone_count(); } - /* - * Return the total number of tombstones contained within all of the - * levels of the structure. - */ - size_t get_tombstone_count() { - size_t cnt = 0; - - for (size_t i=0; iget_tombstone_count(); - } - - return cnt; + return cnt; + } + + /* + * Return the number of levels within the structure. Note that not + * all of these levels are necessarily populated. + */ + size_t get_height() { return m_levels.size(); } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing the primary data structure and raw data. + */ + size_t get_memory_usage() { + size_t cnt = 0; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) + cnt += m_levels[i]->get_memory_usage(); } - /* - * Return the number of levels within the structure. Note that not - * all of these levels are necessarily populated. - */ - size_t get_height() { - return m_levels.size(); + return cnt; + } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing auxiliary data structures. This total does not + * include memory used for the main data structure, or raw data. + */ + size_t get_aux_memory_usage() { + size_t cnt = 0; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + cnt += m_levels[i]->get_aux_memory_usage(); + } } - /* - * Return the amount of memory (in bytes) used by the shards within the - * structure for storing the primary data structure and raw data. - */ - size_t get_memory_usage() { - size_t cnt = 0; - for (size_t i=0; iget_memory_usage(); + return cnt; + } + + /* + * Validate that no level in the structure exceeds its maximum tombstone + * capacity. This is used to trigger preemptive compactions at the end of + * the reconstruction process. + */ + bool validate_tombstone_proportion() { + long double ts_prop; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + ts_prop = (long double)m_levels[i]->get_tombstone_count() / + (long double)calc_level_record_capacity(i); + if (ts_prop > (long double)m_max_delete_prop) { + return false; } - - return cnt; + } } - /* - * Return the amount of memory (in bytes) used by the shards within the - * structure for storing auxiliary data structures. This total does not - * include memory used for the main data structure, or raw data. + return true; + } + + bool validate_tombstone_proportion(level_index level) { + long double ts_prop = (long double)m_levels[level]->get_tombstone_count() / + (long double)calc_level_record_capacity(level); + return ts_prop <= (long double)m_max_delete_prop; + } + + /* + * Return a reference to the underlying vector of levels within the + * structure. + */ + std::vector>> & + get_levels() { + return m_levels; + } + + /* + * NOTE: This cannot be simulated, because tombstone cancellation is not + * cheaply predictable. It is possible that the worst case number could + * be used instead, to allow for prediction, but compaction isn't a + * major concern outside of sampling; at least for now. So I'm not + * going to focus too much time on it at the moment. + */ + ReconstructionVector get_compaction_tasks() { + ReconstructionVector tasks; + state_vector scratch_state = m_current_state; + + /* if the tombstone/delete invariant is satisfied, no need for compactions */ - size_t get_aux_memory_usage() { - size_t cnt = 0; - for (size_t i=0; iget_aux_memory_usage(); - } - } - - return cnt; + if (validate_tombstone_proportion()) { + return tasks; } - /* - * Validate that no level in the structure exceeds its maximum tombstone - * capacity. This is used to trigger preemptive compactions at the end of - * the reconstruction process. - */ - bool validate_tombstone_proportion() { - long double ts_prop; - for (size_t i = 0; i < m_levels.size(); i++) { - if (m_levels[i]) { - ts_prop = (long double)m_levels[i]->get_tombstone_count() / - (long double)calc_level_record_capacity(i); - if (ts_prop > (long double)m_max_delete_prop) { - return false; - } - } + /* locate the first level to violate the invariant */ + level_index violation_idx = -1; + for (level_index i = 0; i < m_levels.size(); i++) { + if (!validate_tombstone_proportion(i)) { + violation_idx = i; + break; } - - return true; } - bool validate_tombstone_proportion(level_index level) { - long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level); - return ts_prop <= (long double) m_max_delete_prop; - } + assert(violation_idx != -1); - /* - * Return a reference to the underlying vector of levels within the - * structure. - */ - std::vector>> &get_levels() { - return m_levels; + level_index base_level = + find_reconstruction_target(violation_idx, scratch_state); + if (base_level == -1) { + base_level = grow(scratch_state); } - /* - * NOTE: This cannot be simulated, because tombstone cancellation is not - * cheaply predictable. It is possible that the worst case number could - * be used instead, to allow for prediction, but compaction isn't a - * major concern outside of sampling; at least for now. So I'm not - * going to focus too much time on it at the moment. - */ - ReconstructionVector get_compaction_tasks() { - ReconstructionVector tasks; - state_vector scratch_state = m_current_state; - - /* if the tombstone/delete invariant is satisfied, no need for compactions */ - if (validate_tombstone_proportion()) { - return tasks; - } - - /* locate the first level to violate the invariant */ - level_index violation_idx = -1; - for (level_index i=0; i0; i--) { - /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. - */ - size_t reccnt = m_levels[i - 1]->get_record_count(); - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt, scratch_state)) { - reccnt += m_levels[i]->get_record_count(); - } - } - tasks.add_reconstruction(i-i, i, reccnt); + for (level_index i = base_level; i > 0; i--) { + /* + * The amount of storage required for the reconstruction accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i - 1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_reconstruct_with(i, reccnt, scratch_state)) { + reccnt += m_levels[i]->get_record_count(); } - - return tasks; + } + tasks.add_reconstruction(i - i, i, reccnt); } + return tasks; + } + + /* + * + */ + ReconstructionVector + get_reconstruction_tasks(size_t buffer_reccnt, + state_vector scratch_state = {}) { /* - * + * If no scratch state vector is provided, use a copy of the + * current one. The only time an empty vector could be used as + * *real* input to this function is when the current state is also + * empty, so this should would even in that case. */ - ReconstructionVector get_reconstruction_tasks(size_t buffer_reccnt, - state_vector scratch_state={}) { - /* - * If no scratch state vector is provided, use a copy of the - * current one. The only time an empty vector could be used as - * *real* input to this function is when the current state is also - * empty, so this should would even in that case. - */ - if (scratch_state.size() == 0) { - scratch_state = m_current_state; - } - - ReconstructionVector reconstructions; - size_t LOOKAHEAD = 1; - for (size_t i=0; i reconstructions.get_total_reccnt()) { - auto t = local_recon.remove_reconstruction(0); - reconstructions.add_reconstruction(t); - } - } - - /* simulate the buffer flush in the scratch state */ - scratch_state[0].reccnt += buffer_reccnt; - if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) { - scratch_state[0].shardcnt += 1; - } - - } - - return std::move(reconstructions); + if (scratch_state.size() == 0) { + scratch_state = m_current_state; } - - /* - * - */ - ReconstructionVector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) { - ReconstructionVector reconstructions; + ReconstructionVector reconstructions; + size_t LOOKAHEAD = 1; + for (size_t i = 0; i < LOOKAHEAD; i++) { + /* + * If L0 cannot support a direct buffer flush, figure out what + * work must be done to free up space first. Otherwise, the + * reconstruction vector will be initially empty. + */ + if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) { + auto local_recon = + get_reconstruction_tasks_from_level(0, scratch_state); /* - * Find the first level capable of sustaining a reconstruction from - * the level above it. If no such level exists, add a new one at - * the bottom of the structure. + * for the first iteration, we need to do all of the + * reconstructions, so use these to initially the returned + * reconstruction list */ - level_index base_level = find_reconstruction_target(source_level, scratch_state); - if (base_level == -1) { - base_level = grow(scratch_state); + if (i == 0) { + reconstructions = local_recon; + /* + * Quick sanity test of idea: if the next reconstruction + * would be larger than this one, steal the largest + * task from it and run it now instead. + */ + } else if (local_recon.get_total_reccnt() > + reconstructions.get_total_reccnt()) { + auto t = local_recon.remove_reconstruction(0); + reconstructions.add_reconstruction(t); } + } - if constexpr (L == LayoutPolicy::BSM) { - if (base_level == 0) { - return std::move(reconstructions); - } - - ReconstructionTask task; - task.target = base_level; - - size_t base_reccnt = 0; - for (level_index i=base_level; i>source_level; i--) { - auto recon_reccnt = scratch_state[i-1].reccnt; - base_reccnt += recon_reccnt; - scratch_state[i-1].reccnt = 0; - scratch_state[i-1].shardcnt = 0; - task.add_source(i-1, recon_reccnt); - } + /* simulate the buffer flush in the scratch state */ + scratch_state[0].reccnt += buffer_reccnt; + if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) { + scratch_state[0].shardcnt += 1; + } + } - reconstructions.add_reconstruction(task); - scratch_state[base_level].reccnt = base_reccnt; - scratch_state[base_level].shardcnt = 1; + return reconstructions; + } - return std::move(reconstructions); - } + /* + * + */ + ReconstructionVector + get_reconstruction_tasks_from_level(level_index source_level, + state_vector &scratch_state) { + ReconstructionVector reconstructions; - /* - * Determine the full set of reconstructions necessary to open up - * space in the source level. - */ - for (level_index i=base_level; i>source_level; i--) { - size_t recon_reccnt = scratch_state[i-1].reccnt; - size_t base_reccnt = recon_reccnt; - - /* - * If using Leveling, the total reconstruction size will be the - * records in *both* base and target, because they will need to - * be merged (assuming that target isn't empty). - */ - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, base_reccnt, scratch_state)) { - recon_reccnt += scratch_state[i].reccnt; - } - } - reconstructions.add_reconstruction(i-1, i, recon_reccnt); - - /* - * The base level will be emptied and its records moved to - * the target. - */ - scratch_state[i-1].reccnt = 0; - scratch_state[i-1].shardcnt = 0; - - /* - * The target level will have the records from the base level - * added to it, and potentially gain a shard if the LayoutPolicy - * is tiering or the level currently lacks any shards at all. - */ - scratch_state[i].reccnt += base_reccnt; - if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) { - scratch_state[i].shardcnt += 1; - } - } - - return std::move(reconstructions); + /* + * Find the first level capable of sustaining a reconstruction from + * the level above it. If no such level exists, add a new one at + * the bottom of the structure. + */ + level_index base_level = + find_reconstruction_target(source_level, scratch_state); + if (base_level == -1) { + base_level = grow(scratch_state); } - inline void reconstruction(ReconstructionTask task) { - static_assert(L == LayoutPolicy::BSM); - std::vector*> levels(task.sources.size()); - for (size_t i=0; i::reconstruction(levels, task.target); - if (task.target >= m_levels.size()) { - m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target), - 1, 1}); - m_levels.emplace_back(new_level); - } else { - m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target), - 1, 1}; - m_levels[task.target] = new_level; - } + ReconstructionTask task; + task.target = base_level; - /* remove all of the levels that have been flattened */ - for (size_t i=0; i>(new InternalLevel(task.sources[i], 1)); - m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1}; - } + size_t base_reccnt = 0; + for (level_index i = base_level; i > source_level; i--) { + auto recon_reccnt = scratch_state[i - 1].reccnt; + base_reccnt += recon_reccnt; + scratch_state[i - 1].reccnt = 0; + scratch_state[i - 1].shardcnt = 0; + task.add_source(i - 1, recon_reccnt); + } + + reconstructions.add_reconstruction(task); + scratch_state[base_level].reccnt = base_reccnt; + scratch_state[base_level].shardcnt = 1; - return; + return reconstructions; } /* - * Combine incoming_level with base_level and reconstruct the shard, - * placing it in base_level. The two levels should be sequential--i.e. no - * levels are skipped in the reconstruction process--otherwise the - * tombstone ordering invariant may be violated. + * Determine the full set of reconstructions necessary to open up + * space in the source level. */ - inline void reconstruction(level_index base_level, level_index incoming_level) { - size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - - if (base_level >= m_levels.size()) { - m_levels.emplace_back(std::shared_ptr>(new InternalLevel(base_level, shard_capacity))); - m_current_state.push_back({0, calc_level_record_capacity(base_level), - 0, shard_capacity}); - } - - if constexpr (L == LayoutPolicy::LEVELING) { - /* if the base level has a shard, merge the base and incoming together to make a new one */ - if (m_levels[base_level]->get_shard_count() > 0) { - m_levels[base_level] = InternalLevel::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get()); - /* otherwise, we can just move the incoming to the base */ - } else { - m_levels[base_level] = m_levels[incoming_level]; - } - - } else { - m_levels[base_level]->append_level(m_levels[incoming_level].get()); - m_levels[base_level]->finalize(); + for (level_index i = base_level; i > source_level; i--) { + size_t recon_reccnt = scratch_state[i - 1].reccnt; + size_t base_reccnt = recon_reccnt; + + /* + * If using Leveling, the total reconstruction size will be the + * records in *both* base and target, because they will need to + * be merged (assuming that target isn't empty). + */ + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_reconstruct_with(i, base_reccnt, scratch_state)) { + recon_reccnt += scratch_state[i].reccnt; } + } + reconstructions.add_reconstruction(i - 1, i, recon_reccnt); + + /* + * The base level will be emptied and its records moved to + * the target. + */ + scratch_state[i - 1].reccnt = 0; + scratch_state[i - 1].shardcnt = 0; + + /* + * The target level will have the records from the base level + * added to it, and potentially gain a shard if the LayoutPolicy + * is tiering or the level currently lacks any shards at all. + */ + scratch_state[i].reccnt += base_reccnt; + if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) { + scratch_state[i].shardcnt += 1; + } + } - /* place a new, empty level where the incoming level used to be */ - m_levels[incoming_level] = std::shared_ptr>(new InternalLevel(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + return reconstructions; + } - /* - * Update the state vector to match the *real* state following - * the reconstruction - */ - m_current_state[base_level] = {m_levels[base_level]->get_record_count(), - calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity}; - m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; + inline void reconstruction(ReconstructionTask task) { + static_assert(L == LayoutPolicy::BSM); + std::vector *> levels( + task.sources.size()); + for (size_t i = 0; i < task.sources.size(); i++) { + levels[i] = m_levels[task.sources[i]].get(); } - bool take_reference() { - m_refcnt.fetch_add(1); - return true; + auto new_level = InternalLevel::reconstruction( + levels, task.target); + if (task.target >= m_levels.size()) { + m_current_state.push_back({new_level->get_record_count(), + calc_level_record_capacity(task.target), 1, + 1}); + m_levels.emplace_back(new_level); + } else { + m_current_state[task.target] = {new_level->get_record_count(), + calc_level_record_capacity(task.target), + 1, 1}; + m_levels[task.target] = new_level; } - bool release_reference() { - assert(m_refcnt.load() > 0); - m_refcnt.fetch_add(-1); - return true; + /* remove all of the levels that have been flattened */ + for (size_t i = 0; i < task.sources.size(); i++) { + m_levels[task.sources[i]] = + std::shared_ptr>( + new InternalLevel(task.sources[i], 1)); + m_current_state[task.sources[i]] = { + 0, calc_level_record_capacity(task.target), 0, 1}; } - size_t get_reference_count() { - return m_refcnt.load(); + return; + } + + /* + * Combine incoming_level with base_level and reconstruct the shard, + * placing it in base_level. The two levels should be sequential--i.e. no + * levels are skipped in the reconstruction process--otherwise the + * tombstone ordering invariant may be violated. + */ + inline void reconstruction(level_index base_level, + level_index incoming_level) { + size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + if (base_level >= m_levels.size()) { + m_levels.emplace_back( + std::shared_ptr>( + new InternalLevel(base_level, + shard_capacity))); + m_current_state.push_back( + {0, calc_level_record_capacity(base_level), 0, shard_capacity}); } - std::vector get_query_states(std::vector> &shards, void *parms) { - std::vector states; - - for (auto &level : m_levels) { - level->get_query_states(shards, states, parms); - } + if constexpr (L == LayoutPolicy::LEVELING) { + /* if the base level has a shard, merge the base and incoming together to + * make a new one */ + if (m_levels[base_level]->get_shard_count() > 0) { + m_levels[base_level] = + InternalLevel::reconstruction( + m_levels[base_level].get(), m_levels[incoming_level].get()); + /* otherwise, we can just move the incoming to the base */ + } else { + m_levels[base_level] = m_levels[incoming_level]; + } - return states; + } else { + m_levels[base_level]->append_level(m_levels[incoming_level].get()); + m_levels[base_level]->finalize(); } -private: - size_t m_scale_factor; - double m_max_delete_prop; - size_t m_buffer_size; - - std::atomic m_refcnt; - - std::vector>> m_levels; - - /* - * A pair of for each level in the - * structure. Record counts may be slightly inaccurate due to - * deletes. - */ - state_vector m_current_state; + /* place a new, empty level where the incoming level used to be */ + m_levels[incoming_level] = + std::shared_ptr>( + new InternalLevel( + incoming_level, + (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); /* - * Add a new level to the scratch state and return its index. - * - * IMPORTANT: This does _not_ add a level to the extension structure - * anymore. This is handled by the appropriate reconstruction and flush - * methods as needed. This function is for use in "simulated" - * reconstructions. + * Update the state vector to match the *real* state following + * the reconstruction */ - inline level_index grow(state_vector &scratch_state) { - level_index new_idx = m_levels.size(); - size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - - scratch_state.push_back({0, calc_level_record_capacity(new_idx), - 0, new_shard_cap}); - return new_idx; + m_current_state[base_level] = {m_levels[base_level]->get_record_count(), + calc_level_record_capacity(base_level), + m_levels[base_level]->get_shard_count(), + shard_capacity}; + m_current_state[incoming_level] = { + 0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; + } + + bool take_reference() { + m_refcnt.fetch_add(1); + return true; + } + + bool release_reference() { + assert(m_refcnt.load() > 0); + m_refcnt.fetch_add(-1); + return true; + } + + size_t get_reference_count() { return m_refcnt.load(); } + + std::vector + get_local_queries(std::vector> &shards, + typename QueryType::Parameters *parms) { + + std::vector queries; + + for (auto &level : m_levels) { + level->get_local_queries(shards, queries, parms); } - /* - * Find the first level below the level indicated by idx that - * is capable of sustaining a reconstruction and return its - * level index. If no such level exists, returns -1. Also - * returns -1 if idx==0, and no such level exists, to simplify - * the logic of the first buffer flush. - */ - inline level_index find_reconstruction_target(level_index idx, state_vector &state) { + return queries; + } - /* - * this handles the very first buffer flush, when the state vector - * is empty. - */ - if (idx == 0 && state.size() == 0) return -1; +private: + size_t m_scale_factor; + double m_max_delete_prop; + size_t m_buffer_size; + + std::atomic m_refcnt; + + std::vector>> m_levels; + + /* + * A pair of for each level in the + * structure. Record counts may be slightly inaccurate due to + * deletes. + */ + state_vector m_current_state; + + /* + * Add a new level to the scratch state and return its index. + * + * IMPORTANT: This does _not_ add a level to the extension structure + * anymore. This is handled by the appropriate reconstruction and flush + * methods as needed. This function is for use in "simulated" + * reconstructions. + */ + inline level_index grow(state_vector &scratch_state) { + level_index new_idx = m_levels.size(); + size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + scratch_state.push_back( + {0, calc_level_record_capacity(new_idx), 0, new_shard_cap}); + return new_idx; + } + + /* + * Find the first level below the level indicated by idx that + * is capable of sustaining a reconstruction and return its + * level index. If no such level exists, returns -1. Also + * returns -1 if idx==0, and no such level exists, to simplify + * the logic of the first buffer flush. + */ + inline level_index find_reconstruction_target(level_index idx, + state_vector &state) { - size_t incoming_rec_cnt = state[idx].reccnt; - for (level_index i=idx+1; i>(new InternalLevel(0, shard_capacity))); + return -1; + } - m_current_state.push_back({0, calc_level_record_capacity(0), - 0, shard_capacity}); - } + inline void flush_buffer_into_l0(BuffView buffer) { + size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - if constexpr (L == LayoutPolicy::LEVELING) { - // FIXME: Kludgey implementation due to interface constraints. - auto old_level = m_levels[0].get(); - auto temp_level = new InternalLevel(0, 1); - temp_level->append_buffer(std::move(buffer)); - - if (old_level->get_shard_count() > 0) { - m_levels[0] = InternalLevel::reconstruction(old_level, temp_level); - delete temp_level; - } else { - m_levels[0] = std::shared_ptr>(temp_level); - } - } else { - m_levels[0]->append_buffer(std::move(buffer)); - } + if (m_levels.size() == 0) { + m_levels.emplace_back( + std::shared_ptr>( + new InternalLevel(0, shard_capacity))); - /* update the state vector */ - m_current_state[0].reccnt = m_levels[0]->get_record_count(); - m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); + m_current_state.push_back( + {0, calc_level_record_capacity(0), 0, shard_capacity}); } - /* - * Mark a given memory level as no-longer in use by the tree. For now this - * will just free the level. In future, this will be more complex as the - * level may not be able to immediately be deleted, depending upon who - * else is using it. - */ - inline void mark_as_unused(std::shared_ptr> level) { - level.reset(); + if constexpr (L == LayoutPolicy::LEVELING) { + // FIXME: Kludgey implementation due to interface constraints. + auto old_level = m_levels[0].get(); + auto temp_level = new InternalLevel(0, 1); + temp_level->append_buffer(std::move(buffer)); + + if (old_level->get_shard_count() > 0) { + m_levels[0] = InternalLevel::reconstruction( + old_level, temp_level); + delete temp_level; + } else { + m_levels[0] = + std::shared_ptr>(temp_level); + } + } else { + m_levels[0]->append_buffer(std::move(buffer)); } - /* - * Assume that level "0" should be larger than the buffer. The buffer - * itself is index -1, which should return simply the buffer capacity. - */ - inline size_t calc_level_record_capacity(level_index idx) { - return m_buffer_size * pow(m_scale_factor, idx+1); + /* update the state vector */ + m_current_state[0].reccnt = m_levels[0]->get_record_count(); + m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); + } + + /* + * Mark a given memory level as no-longer in use by the tree. For now this + * will just free the level. In future, this will be more complex as the + * level may not be able to immediately be deleted, depending upon who + * else is using it. + */ + inline void + mark_as_unused(std::shared_ptr> level) { + level.reset(); + } + + /* + * Assume that level "0" should be larger than the buffer. The buffer + * itself is index -1, which should return simply the buffer capacity. + */ + inline size_t calc_level_record_capacity(level_index idx) { + return m_buffer_size * pow(m_scale_factor, idx + 1); + } + + /* + * Returns the number of records present on a specified level. + */ + inline size_t get_level_record_count(level_index idx) { + return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; + } + + /* + * Determines if a level can sustain a reconstruction with incoming_rec_cnt + * additional records without exceeding its capacity. + */ + inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, + state_vector &state) { + if (idx >= state.size()) { + return false; } - /* - * Returns the number of records present on a specified level. - */ - inline size_t get_level_record_count(level_index idx) { - return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; + if constexpr (L == LayoutPolicy::LEVELING) { + return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; + } else if constexpr (L == LayoutPolicy::BSM) { + return state[idx].reccnt == 0; + } else { + return state[idx].shardcnt < state[idx].shardcap; } - /* - * Determines if a level can sustain a reconstruction with incoming_rec_cnt - * additional records without exceeding its capacity. - */ - inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) { - if (idx >= state.size()) { - return false; - } - - if constexpr (L == LayoutPolicy::LEVELING) { - return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; - } else if constexpr (L == LayoutPolicy::BSM) { - return state[idx].reccnt == 0; - } else { - return state[idx].shardcnt < state[idx].shardcap; - } - - /* unreachable */ - assert(true); - } + /* unreachable */ + assert(true); + } }; -} - +} // namespace de diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index b962dcc..a4cf94d 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -1,8 +1,8 @@ /* * include/framework/structure/InternalLevel.h * - * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie + * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Dong Xie * * Distributed under the Modified BSD License. * @@ -15,276 +15,281 @@ */ #pragma once -#include #include +#include -#include "util/types.h" -#include "framework/interface/Shard.h" #include "framework/interface/Query.h" #include "framework/interface/Record.h" +#include "framework/interface/Shard.h" #include "framework/structure/BufferView.h" +#include "util/types.h" namespace de { -template S, QueryInterface Q> +template QueryType> class InternalLevel; - - -template S, QueryInterface Q> +template QueryType> class InternalLevel { - typedef S Shard; - typedef BufferView BuffView; -public: - InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no) - , m_shard_cnt(0) - , m_shards(shard_cap, nullptr) - , m_pending_shard(nullptr) - {} - - ~InternalLevel() { - delete m_pending_shard; - } + typedef typename ShardType::RECORD RecordType; + typedef BufferView BuffView; - /* - * Create a new shard combining the records from base_level and new_level, - * and return a shared_ptr to a new level containing this shard. This is used - * for reconstructions under the leveling layout policy. - * - * No changes are made to the levels provided as arguments. - */ - static std::shared_ptr reconstruction(InternalLevel* base_level, InternalLevel* new_level) { - assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); - auto res = new InternalLevel(base_level->m_level_no, 1); - res->m_shard_cnt = 1; - std::vector shards = {base_level->m_shards[0].get(), +public: + InternalLevel(ssize_t level_no, size_t shard_cap) + : m_level_no(level_no), m_shard_cnt(0), m_shards(shard_cap, nullptr), + m_pending_shard(nullptr) {} + + ~InternalLevel() { delete m_pending_shard; } + + /* + * Create a new shard combining the records from base_level and new_level, + * and return a shared_ptr to a new level containing this shard. This is used + * for reconstructions under the leveling layout policy. + * + * No changes are made to the levels provided as arguments. + */ + static std::shared_ptr + reconstruction(InternalLevel *base_level, InternalLevel *new_level) { + assert(base_level->m_level_no > new_level->m_level_no || + (base_level->m_level_no == 0 && new_level->m_level_no == 0)); + auto res = new InternalLevel(base_level->m_level_no, 1); + res->m_shard_cnt = 1; + std::vector shards = {base_level->m_shards[0].get(), new_level->m_shards[0].get()}; - res->m_shards[0] = std::make_shared(shards); - return std::shared_ptr(res); + res->m_shards[0] = std::make_shared(shards); + return std::shared_ptr(res); + } + + static std::shared_ptr + reconstruction(std::vector levels, size_t level_idx) { + std::vector shards; + for (auto level : levels) { + for (auto shard : level->m_shards) { + if (shard) + shards.emplace_back(shard.get()); + } } - static std::shared_ptr reconstruction(std::vector levels, size_t level_idx) { - std::vector shards; - for (auto level : levels) { - for (auto shard : level->m_shards) { - if (shard) shards.emplace_back(shard.get()); - } - } - - auto res = new InternalLevel(level_idx, 1); - res->m_shard_cnt = 1; - res->m_shards[0] = std::make_shared(shards); + auto res = new InternalLevel(level_idx, 1); + res->m_shard_cnt = 1; + res->m_shards[0] = std::make_shared(shards); + + return std::shared_ptr(res); + } + + /* + * Create a new shard combining the records from all of + * the shards in level, and append this new shard into + * this level. This is used for reconstructions under + * the tiering layout policy. + * + * No changes are made to the level provided as an argument. + */ + void append_level(InternalLevel *level) { + // FIXME: that this is happening probably means that + // something is going terribly wrong earlier in the + // reconstruction logic. + if (level->get_shard_count() == 0) { + return; + } - return std::shared_ptr(res); + std::vector shards; + for (auto shard : level->m_shards) { + if (shard) + shards.emplace_back(shard.get()); } - /* - * Create a new shard combining the records from all of - * the shards in level, and append this new shard into - * this level. This is used for reconstructions under - * the tiering layout policy. - * - * No changes are made to the level provided as an argument. - */ - void append_level(InternalLevel* level) { - // FIXME: that this is happening probably means that - // something is going terribly wrong earlier in the - // reconstruction logic. - if (level->get_shard_count() == 0) { - return; - } + if (m_shard_cnt == m_shards.size()) { + m_pending_shard = new ShardType(shards); + return; + } - std::vector shards; - for (auto shard : level->m_shards) { - if (shard) shards.emplace_back(shard.get()); - } + auto tmp = new ShardType(shards); + m_shards[m_shard_cnt] = std::shared_ptr(tmp); + + ++m_shard_cnt; + } + + /* + * Create a new shard using the records in the + * provided buffer, and append this new shard + * into this level. This is used for buffer + * flushes under the tiering layout policy. + */ + void append_buffer(BuffView buffer) { + if (m_shard_cnt == m_shards.size()) { + assert(m_pending_shard == nullptr); + m_pending_shard = new ShardType(std::move(buffer)); + return; + } - if (m_shard_cnt == m_shards.size()) { - m_pending_shard = new S(shards); - return; - } + m_shards[m_shard_cnt] = std::make_shared(std::move(buffer)); + ++m_shard_cnt; + } - auto tmp = new S(shards); - m_shards[m_shard_cnt] = std::shared_ptr(tmp); + void finalize() { + if (m_pending_shard) { + for (size_t i = 0; i < m_shards.size(); i++) { + m_shards[i] = nullptr; + } - ++m_shard_cnt; + m_shards[0] = std::shared_ptr(m_pending_shard); + m_pending_shard = nullptr; + m_shard_cnt = 1; } - - /* - * Create a new shard using the records in the - * provided buffer, and append this new shard - * into this level. This is used for buffer - * flushes under the tiering layout policy. - */ - void append_buffer(BuffView buffer) { - if (m_shard_cnt == m_shards.size()) { - assert(m_pending_shard == nullptr); - m_pending_shard = new S(std::move(buffer)); - return; - } - - m_shards[m_shard_cnt] = std::make_shared(std::move(buffer)); - ++m_shard_cnt; + } + + /* + * Create a new shard containing the combined records + * from all shards on this level and return it. + * + * No changes are made to this level. + */ + ShardType *get_combined_shard() { + if (m_shard_cnt == 0) { + return nullptr; } - void finalize() { - if (m_pending_shard) { - for (size_t i=0; i(m_pending_shard); - m_pending_shard = nullptr; - m_shard_cnt = 1; - } + std::vector shards; + for (auto shard : m_shards) { + if (shard) + shards.emplace_back(shard.get()); } - /* - * Create a new shard containing the combined records - * from all shards on this level and return it. - * - * No changes are made to this level. - */ - Shard *get_combined_shard() { - if (m_shard_cnt == 0) { - return nullptr; - } - - std::vector shards; - for (auto shard : m_shards) { - if (shard) shards.emplace_back(shard.get()); - } - - return new S(shards); + return new ShardType(shards); + } + + void get_local_queries( + std::vector> &shards, + std::vector &local_queries, + typename QueryType::Parameters *query_parms) { + for (size_t i = 0; i < m_shard_cnt; i++) { + if (m_shards[i]) { + auto local_query = + QueryType::local_preproc(m_shards[i].get(), query_parms); + shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].get()}); + local_queries.emplace_back(local_query); + } } + } - void get_query_states(std::vector> &shards, std::vector& shard_states, void *query_parms) { - for (size_t i=0; i= (ssize_t)shard_stop; i--) { + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec, true); + if (res && res->is_tombstone()) { + return true; } + } } - - bool check_tombstone(size_t shard_stop, const R& rec) { - if (m_shard_cnt == 0) return false; - - for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec, true); - if (res && res->is_tombstone()) { - return true; - } - } + return false; + } + + bool delete_record(const RecordType &rec) { + if (m_shard_cnt == 0) + return false; + + for (size_t i = 0; i < m_shards.size(); ++i) { + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec); + if (res) { + res->set_delete(); + return true; } - return false; + } } - bool delete_record(const R &rec) { - if (m_shard_cnt == 0) return false; - - for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec); - if (res) { - res->set_delete(); - return true; - } - } - } + return false; + } - return false; + ShardType *get_shard(size_t idx) { + if (idx >= m_shard_cnt) { + return nullptr; } - Shard* get_shard(size_t idx) { - if (idx >= m_shard_cnt) { - return nullptr; - } + return m_shards[idx].get(); + } - return m_shards[idx].get(); - } + size_t get_shard_count() { return m_shard_cnt; } - size_t get_shard_count() { - return m_shard_cnt; + size_t get_record_count() { + size_t cnt = 0; + for (size_t i = 0; i < m_shard_cnt; i++) { + if (m_shards[i]) { + cnt += m_shards[i]->get_record_count(); + } } - size_t get_record_count() { - size_t cnt = 0; - for (size_t i=0; iget_record_count(); - } - } + return cnt; + } - return cnt; + size_t get_tombstone_count() { + size_t res = 0; + for (size_t i = 0; i < m_shard_cnt; ++i) { + if (m_shards[i]) { + res += m_shards[i]->get_tombstone_count(); + } } - - size_t get_tombstone_count() { - size_t res = 0; - for (size_t i = 0; i < m_shard_cnt; ++i) { - if (m_shards[i]) { - res += m_shards[i]->get_tombstone_count(); - } - } - return res; + return res; + } + + size_t get_aux_memory_usage() { + size_t cnt = 0; + for (size_t i = 0; i < m_shard_cnt; i++) { + if (m_shards[i]) { + cnt += m_shards[i]->get_aux_memory_usage(); + } } - size_t get_aux_memory_usage() { - size_t cnt = 0; - for (size_t i=0; iget_aux_memory_usage(); - } - } + return cnt; + } - return cnt; + size_t get_memory_usage() { + size_t cnt = 0; + for (size_t i = 0; i < m_shard_cnt; i++) { + if (m_shards[i]) { + cnt += m_shards[i]->get_memory_usage(); + } } - size_t get_memory_usage() { - size_t cnt = 0; - for (size_t i=0; iget_memory_usage(); - } - } - - return cnt; + return cnt; + } + + double get_tombstone_prop() { + size_t tscnt = 0; + size_t reccnt = 0; + for (size_t i = 0; i < m_shard_cnt; i++) { + if (m_shards[i]) { + tscnt += m_shards[i]->get_tombstone_count(); + reccnt += m_shards[i]->get_record_count(); + } } - double get_tombstone_prop() { - size_t tscnt = 0; - size_t reccnt = 0; - for (size_t i=0; iget_tombstone_count(); - reccnt += m_shards[i]->get_record_count(); - } - } + return (double)tscnt / (double)(tscnt + reccnt); + } - return (double) tscnt / (double) (tscnt + reccnt); + std::shared_ptr clone() { + auto new_level = + std::make_shared(m_level_no, m_shards.size()); + for (size_t i = 0; i < m_shard_cnt; i++) { + new_level->m_shards[i] = m_shards[i]; } + new_level->m_shard_cnt = m_shard_cnt; - std::shared_ptr clone() { - auto new_level = std::make_shared(m_level_no, m_shards.size()); - for (size_t i=0; im_shards[i] = m_shards[i]; - } - new_level->m_shard_cnt = m_shard_cnt; - - return new_level; - } + return new_level; + } private: - ssize_t m_level_no; - - size_t m_shard_cnt; - size_t m_shard_size_cap; + ssize_t m_level_no; + + size_t m_shard_cnt; + size_t m_shard_size_cap; - std::vector> m_shards; - Shard *m_pending_shard; + std::vector> m_shards; + ShardType *m_pending_shard; }; -} +} // namespace de diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 7db3980..625b04b 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -1,8 +1,8 @@ /* * include/framework/structure/MutableBuffer.h * - * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie + * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Dong Xie * * Distributed under the Modified BSD License. * @@ -18,301 +18,281 @@ */ #pragma once -#include #include #include +#include #include -#include "psu-util/alignment.h" -#include "util/bf_config.h" -#include "psu-ds/BloomFilter.h" #include "framework/interface/Record.h" #include "framework/structure/BufferView.h" - -using psudb::CACHELINE_SIZE; +#include "psu-ds/BloomFilter.h" +#include "psu-util/alignment.h" +#include "util/bf_config.h" namespace de { -template -class MutableBuffer { - friend class BufferView; +template class MutableBuffer { + friend class BufferView; - struct buffer_head { - size_t head_idx; - size_t refcnt; - }; - -public: - MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0) - : m_lwm(low_watermark) - , m_hwm(high_watermark) - , m_cap((capacity == 0) ? 2 * high_watermark : capacity) - , m_tail(0) - , m_head({0, 0}) - , m_old_head({high_watermark, 0}) - //, m_data((Wrapped *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped))) - , m_data(new Wrapped[m_cap]()) - , m_tombstone_filter(new psudb::BloomFilter(BF_FPR, m_hwm, BF_HASH_FUNCS)) - , m_tscnt(0) - , m_old_tscnt(0) - , m_active_head_advance(false) - { - assert(m_cap > m_hwm); - assert(m_hwm >= m_lwm); - } + struct buffer_head { + size_t head_idx; + size_t refcnt; + }; - ~MutableBuffer() { - delete[] m_data; - delete m_tombstone_filter; +public: + MutableBuffer(size_t low_watermark, size_t high_watermark, + size_t capacity = 0) + : m_lwm(low_watermark), m_hwm(high_watermark), + m_cap((capacity == 0) ? 2 * high_watermark : capacity), m_tail(0), + m_head({0, 0}), m_old_head({high_watermark, 0}), + m_data(new Wrapped[m_cap]()), + m_tombstone_filter( + new psudb::BloomFilter(BF_FPR, m_hwm, BF_HASH_FUNCS)), + m_tscnt(0), m_old_tscnt(0), m_active_head_advance(false) { + assert(m_cap > m_hwm); + assert(m_hwm >= m_lwm); + } + + ~MutableBuffer() { + delete[] m_data; + delete m_tombstone_filter; + } + + int append(const R &rec, bool tombstone = false) { + int32_t tail = 0; + if ((tail = try_advance_tail()) == -1) { + return 0; } - int append(const R &rec, bool tombstone=false) { - int32_t tail = 0; - if ((tail = try_advance_tail()) == -1) { - return 0; - } - - Wrapped wrec; - wrec.rec = rec; - wrec.header = 0; - if (tombstone) wrec.set_tombstone(); + Wrapped wrec; + wrec.rec = rec; + wrec.header = 0; + if (tombstone) + wrec.set_tombstone(); - // FIXME: because of the mod, it isn't correct to use `pos` - // as the ordering timestamp in the header anymore. - size_t pos = tail % m_cap; - - m_data[pos] = wrec; - m_data[pos].set_timestamp(pos); - - if (tombstone) { - m_tscnt.fetch_add(1); - if (m_tombstone_filter) m_tombstone_filter->insert(rec); - } + // FIXME: because of the mod, it isn't correct to use `pos` + // as the ordering timestamp in the header anymore. + size_t pos = tail % m_cap; - m_data[pos].set_visible(); + m_data[pos] = wrec; + m_data[pos].set_timestamp(pos); - return 1; + if (tombstone) { + m_tscnt.fetch_add(1); + if (m_tombstone_filter) + m_tombstone_filter->insert(rec); } - bool truncate() { - m_tscnt.store(0); - m_tail.store(0); - if (m_tombstone_filter) m_tombstone_filter->clear(); + m_data[pos].set_visible(); - return true; - } + return 1; + } - size_t get_record_count() { - return m_tail.load() - m_head.load().head_idx; - } - - size_t get_capacity() { - return m_cap; - } + bool truncate() { + m_tscnt.store(0); + m_tail.store(0); + if (m_tombstone_filter) + m_tombstone_filter->clear(); - bool is_full() { - return get_record_count() >= m_hwm; - } + return true; + } - bool is_at_low_watermark() { - return get_record_count() >= m_lwm; - } + size_t get_record_count() { return m_tail.load() - m_head.load().head_idx; } - size_t get_tombstone_count() { - return m_tscnt.load(); - } + size_t get_capacity() { return m_cap; } - bool delete_record(const R& rec) { - return get_buffer_view().delete_record(rec); - } + bool is_full() { return get_record_count() >= m_hwm; } - bool check_tombstone(const R& rec) { - return get_buffer_view().check_tombstone(rec); - } + bool is_at_low_watermark() { return get_record_count() >= m_lwm; } - size_t get_memory_usage() { - return m_cap * sizeof(Wrapped); - } + size_t get_tombstone_count() { return m_tscnt.load(); } - size_t get_aux_memory_usage() { - return m_tombstone_filter->get_memory_usage(); - } + bool delete_record(const R &rec) { + return get_buffer_view().delete_record(rec); + } - BufferView get_buffer_view(size_t target_head) { - size_t head = get_head(target_head); - auto f = std::bind(release_head_reference, (void *) this, head); + bool check_tombstone(const R &rec) { + return get_buffer_view().check_tombstone(rec); + } - return BufferView(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); - } + size_t get_memory_usage() { return m_cap * sizeof(Wrapped); } - BufferView get_buffer_view() { - size_t head = get_head(m_head.load().head_idx); - auto f = std::bind(release_head_reference, (void *) this, head); + size_t get_aux_memory_usage() { + return m_tombstone_filter->get_memory_usage(); + } - return BufferView(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); - } + BufferView get_buffer_view(size_t target_head) { + size_t head = get_head(target_head); + auto f = std::bind(release_head_reference, (void *)this, head); - /* - * Advance the buffer following a reconstruction. Move current - * head and head_refcnt into old_head and old_head_refcnt, then - * assign new_head to old_head. - */ - bool advance_head(size_t new_head) { - assert(new_head > m_head.load().head_idx); - assert(new_head <= m_tail.load()); - - /* refuse to advance head while there is an old with one references */ - if (m_old_head.load().refcnt > 0) { - //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n"); - return false; - } - - m_active_head_advance.store(true); + return BufferView(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), + m_tombstone_filter, f); + } - buffer_head new_hd = {new_head, 0}; - buffer_head cur_hd; + BufferView get_buffer_view() { + size_t head = get_head(m_head.load().head_idx); + auto f = std::bind(release_head_reference, (void *)this, head); - /* replace current head with new head */ - do { - cur_hd = m_head.load(); - } while(!m_head.compare_exchange_strong(cur_hd, new_hd)); + return BufferView(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), + m_tombstone_filter, f); + } - /* move the current head into the old head */ - m_old_head.store(cur_hd); + /* + * Advance the buffer following a reconstruction. Move current + * head and head_refcnt into old_head and old_head_refcnt, then + * assign new_head to old_head. + */ + bool advance_head(size_t new_head) { + assert(new_head > m_head.load().head_idx); + assert(new_head <= m_tail.load()); - m_active_head_advance.store(false); - return true; + /* refuse to advance head while there is an old with one references */ + if (m_old_head.load().refcnt > 0) { + // fprintf(stderr, "[W]: Refusing to advance head due to remaining + // reference counts\n"); + return false; } - /* - * FIXME: If target_head does not match *either* the old_head or the - * current_head, this routine will loop infinitely. - */ - size_t get_head(size_t target_head) { - buffer_head cur_hd, new_hd; - bool head_acquired = false; - - do { - if (m_old_head.load().head_idx == target_head) { - cur_hd = m_old_head.load(); - cur_hd.head_idx = target_head; - new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; - head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); - } else if (m_head.load().head_idx == target_head){ - cur_hd = m_head.load(); - cur_hd.head_idx = target_head; - new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; - head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); - } - } while(!head_acquired); - - return new_hd.head_idx; + m_active_head_advance.store(true); + + buffer_head new_hd = {new_head, 0}; + buffer_head cur_hd; + + /* replace current head with new head */ + do { + cur_hd = m_head.load(); + } while (!m_head.compare_exchange_strong(cur_hd, new_hd)); + + /* move the current head into the old head */ + m_old_head.store(cur_hd); + + m_active_head_advance.store(false); + return true; + } + + /* + * FIXME: If target_head does not match *either* the old_head or the + * current_head, this routine will loop infinitely. + */ + size_t get_head(size_t target_head) { + buffer_head cur_hd, new_hd; + bool head_acquired = false; + + do { + if (m_old_head.load().head_idx == target_head) { + cur_hd = m_old_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else if (m_head.load().head_idx == target_head) { + cur_hd = m_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } while (!head_acquired); + + return new_hd.head_idx; + } + + void set_low_watermark(size_t lwm) { + assert(lwm < m_hwm); + m_lwm = lwm; + } + + size_t get_low_watermark() { return m_lwm; } + + void set_high_watermark(size_t hwm) { + assert(hwm > m_lwm); + assert(hwm < m_cap); + m_hwm = hwm; + } + + size_t get_high_watermark() { return m_hwm; } + + size_t get_tail() { return m_tail.load(); } + + /* + * Note: this returns the available physical storage capacity, + * *not* now many more records can be inserted before the + * HWM is reached. It considers the old_head to be "free" + * when it has no remaining references. This should be true, + * but a buggy framework implementation may violate the + * assumption. + */ + size_t get_available_capacity() { + if (m_old_head.load().refcnt == 0) { + return m_cap - (m_tail.load() - m_head.load().head_idx); } - void set_low_watermark(size_t lwm) { - assert(lwm < m_hwm); - m_lwm = lwm; - } + return m_cap - (m_tail.load() - m_old_head.load().head_idx); + } - size_t get_low_watermark() { - return m_lwm; - } +private: + int64_t try_advance_tail() { + size_t old_value = m_tail.load(); - void set_high_watermark(size_t hwm) { - assert(hwm > m_lwm); - assert(hwm < m_cap); - m_hwm = hwm; + /* if full, fail to advance the tail */ + if (old_value - m_head.load().head_idx >= m_hwm) { + return -1; } - size_t get_high_watermark() { - return m_hwm; - } + while (!m_tail.compare_exchange_strong(old_value, old_value + 1)) { + /* if full, stop trying and fail to advance the tail */ + if (m_tail.load() >= m_hwm) { + return -1; + } - size_t get_tail() { - return m_tail.load(); + _mm_pause(); } - /* - * Note: this returns the available physical storage capacity, - * *not* now many more records can be inserted before the - * HWM is reached. It considers the old_head to be "free" - * when it has no remaining references. This should be true, - * but a buggy framework implementation may violate the - * assumption. - */ - size_t get_available_capacity() { - if (m_old_head.load().refcnt == 0) { - return m_cap - (m_tail.load() - m_head.load().head_idx); - } + return old_value; + } - return m_cap - (m_tail.load() - m_old_head.load().head_idx); - } + size_t to_idx(size_t i, size_t head) { return (head + i) % m_cap; } -private: - int64_t try_advance_tail() { - size_t old_value = m_tail.load(); + static void release_head_reference(void *buff, size_t head) { + MutableBuffer *buffer = (MutableBuffer *)buff; - /* if full, fail to advance the tail */ - if (old_value - m_head.load().head_idx >= m_hwm) { - return -1; + buffer_head cur_hd, new_hd; + do { + if (buffer->m_old_head.load().head_idx == head) { + cur_hd = buffer->m_old_head; + if (cur_hd.refcnt == 0) + continue; + new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; + if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { + break; } - - while (!m_tail.compare_exchange_strong(old_value, old_value+1)) { - /* if full, stop trying and fail to advance the tail */ - if (m_tail.load() >= m_hwm) { - return -1; - } - - _mm_pause(); + } else { + cur_hd = buffer->m_head; + if (cur_hd.refcnt == 0) + continue; + new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; + + if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { + break; } + } + _mm_pause(); + } while (true); + } - return old_value; - } + size_t m_lwm; + size_t m_hwm; + size_t m_cap; - size_t to_idx(size_t i, size_t head) { - return (head + i) % m_cap; - } + alignas(64) std::atomic m_tail; - static void release_head_reference(void *buff, size_t head) { - MutableBuffer *buffer = (MutableBuffer *) buff; - - buffer_head cur_hd, new_hd; - do { - if (buffer->m_old_head.load().head_idx == head) { - cur_hd = buffer->m_old_head; - if (cur_hd.refcnt == 0) continue; - new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; - if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { - break; - } - } else { - cur_hd = buffer->m_head; - if (cur_hd.refcnt == 0) continue; - new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; - - if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { - break; - } - } - _mm_pause(); - } while(true); - } + alignas(64) std::atomic m_head; + alignas(64) std::atomic m_old_head; + + Wrapped *m_data; + psudb::BloomFilter *m_tombstone_filter; + alignas(64) std::atomic m_tscnt; + size_t m_old_tscnt; - size_t m_lwm; - size_t m_hwm; - size_t m_cap; - - alignas(64) std::atomic m_tail; - - alignas(64) std::atomic m_head; - alignas(64) std::atomic m_old_head; - - Wrapped* m_data; - psudb::BloomFilter* m_tombstone_filter; - alignas(64) std::atomic m_tscnt; - size_t m_old_tscnt; - - alignas(64) std::atomic m_active_head_advance; + alignas(64) std::atomic m_active_head_advance; }; -} +} // namespace de diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 4a4524a..f4b0364 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -1,7 +1,7 @@ /* * include/framework/util/Configuration.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -13,35 +13,8 @@ namespace de { -static thread_local size_t sampling_attempts = 0; -static thread_local size_t sampling_rejections = 0; -static thread_local size_t deletion_rejections = 0; -static thread_local size_t bounds_rejections = 0; -static thread_local size_t tombstone_rejections = 0; -static thread_local size_t buffer_rejections = 0; +enum class LayoutPolicy { LEVELING, TEIRING, BSM }; -/* - * thread_local size_t various_sampling_times go here. - */ -static thread_local size_t sample_range_time = 0; -static thread_local size_t alias_time = 0; -static thread_local size_t alias_query_time = 0; -static thread_local size_t rejection_check_time = 0; -static thread_local size_t buffer_sample_time = 0; -static thread_local size_t memlevel_sample_time = 0; -static thread_local size_t disklevel_sample_time = 0; -static thread_local size_t sampling_bailouts = 0; - - -enum class LayoutPolicy { - LEVELING, - TEIRING, - BSM -}; - -enum class DeletePolicy { - TOMBSTONE, - TAGGING -}; +enum class DeletePolicy { TOMBSTONE, TAGGING }; -} +} // namespace de diff --git a/include/query/irs.h b/include/query/irs.h index 879d070..6dec850 100644 --- a/include/query/irs.h +++ b/include/query/irs.h @@ -1,12 +1,12 @@ /* * include/query/irs.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * - * A query class for independent range sampling. This query requires - * that the shard support get_lower_bound(key), get_upper_bound(key), + * A query class for independent range sampling. This query requires + * that the shard support get_lower_bound(key), get_upper_bound(key), * and get_record_at(index). */ #pragma once @@ -14,237 +14,227 @@ #include "framework/QueryRequirements.h" #include "psu-ds/Alias.h" -namespace de { namespace irs { +namespace de { +namespace irs { -template -struct Parms { +template class Query { + typedef typename S::RECORD R; + +public: + struct Parameters { decltype(R::key) lower_bound; decltype(R::key) upper_bound; size_t sample_size; gsl_rng *rng; -}; + }; - -template -struct State { - size_t lower_bound; - size_t upper_bound; - size_t sample_size; + struct LocalQuery { + size_t lower_idx; + size_t upper_idx; size_t total_weight; -}; + size_t sample_size; + Parameters global_parms; + }; + + struct LocalQueryBuffer { + BufferView *buffer; -template -struct BufferState { size_t cutoff; std::vector> records; + std::unique_ptr alias; size_t sample_size; - BufferView *buffer; - psudb::Alias *alias; + Parameters global_parms; + }; - BufferState(BufferView *buffer) : buffer(buffer) {} - ~BufferState() { - delete alias; - } -}; + typedef Wrapped LocalResultType; + typedef R ResultType; -template S, bool Rejection=true> -class Query { -public: - constexpr static bool EARLY_ABORT=false; - constexpr static bool SKIP_DELETE_FILTER=false; + constexpr static bool EARLY_ABORT = false; + constexpr static bool SKIP_DELETE_FILTER = false; - static void *get_query_state(S *shard, void *parms) { - auto res = new State(); - decltype(R::key) lower_key = ((Parms *) parms)->lower_bound; - decltype(R::key) upper_key = ((Parms *) parms)->upper_bound; + static LocalQuery *local_preproc(S *shard, Parameters *parms) { + auto query = new LocalQuery(); - res->lower_bound = shard->get_lower_bound(lower_key); - res->upper_bound = shard->get_upper_bound(upper_key); + query->global_parms = *parms; - if (res->lower_bound == shard->get_record_count()) { - res->total_weight = 0; - } else { - res->total_weight = res->upper_bound - res->lower_bound; - } + query->lower_idx = shard->get_lower_bound(query->global_parms.lower_bound); + query->upper_idx = shard->get_upper_bound(query->global_parms.upper_bound); - res->sample_size = 0; - return res; + if (query->lower_idx == shard->get_record_count()) { + query->total_weight = 0; + } else { + query->total_weight = query->upper_idx - query->lower_idx; } - static void* get_buffer_query_state(BufferView *buffer, void *parms) { - auto res = new BufferState(buffer); - - res->cutoff = res->buffer->get_record_count(); - res->sample_size = 0; - res->alias = nullptr; + query->sample_size = 0; + return query; + } - if constexpr (Rejection) { - return res; - } - - auto lower_key = ((Parms *) parms)->lower_bound; - auto upper_key = ((Parms *) parms)->upper_bound; + static LocalQueryBuffer *local_preproc_buffer(BufferView *buffer, + Parameters *parms) { + auto query = new LocalQueryBuffer(); + query->buffer = buffer; - for (size_t i=0; icutoff; i++) { - if ((res->buffer->get(i)->rec.key >= lower_key) && (buffer->get(i)->rec.key <= upper_key)) { - res->records.emplace_back(*(res->buffer->get(i))); - } - } + query->cutoff = query->buffer->get_record_count(); + query->sample_size = 0; + query->alias = nullptr; + query->global_parms = *parms; - return res; + if constexpr (REJECTION) { + return query; } - static void process_query_states(void *query_parms, std::vector &shard_states, void *buffer_state) { - auto p = (Parms *) query_parms; - auto bs = (buffer_state) ? (BufferState *) buffer_state : nullptr; - - std::vector shard_sample_sizes(shard_states.size()+1, 0); - size_t buffer_sz = 0; + for (size_t i = 0; i < query->cutoff; i++) { + if ((query->buffer->get(i)->rec.key >= query->global_parms.lower_bound) && + (buffer->get(i)->rec.key <= query->global_parms.upper_bound)) { + query->records.emplace_back(*(query->buffer->get(i))); + } + } - /* for simplicity of static structure testing */ - if (!bs) { - assert(shard_states.size() == 1); - auto state = (State *) shard_states[0]; - state->sample_size = p->sample_size; - return; - } + return query; + } - /* we only need to build the shard alias on the first call */ - if (bs->alias == nullptr) { - std::vector weights; - if constexpr (Rejection) { - weights.push_back((bs) ? bs->cutoff : 0); - } else { - weights.push_back((bs) ? bs->records.size() : 0); - } - - size_t total_weight = weights[0]; - for (auto &s : shard_states) { - auto state = (State *) s; - total_weight += state->total_weight; - weights.push_back(state->total_weight); - } - - // if no valid records fall within the query range, just - // set all of the sample sizes to 0 and bail out. - if (total_weight == 0) { - for (size_t i=0; i *) shard_states[i]; - state->sample_size = 0; - } - - return; - } - - std::vector normalized_weights; - for (auto w : weights) { - normalized_weights.push_back((double) w / (double) total_weight); - } - - bs->alias = new psudb::Alias(normalized_weights); - } + static void distribute_query(Parameters *parms, + std::vector const &local_queries, + LocalQueryBuffer *buffer_query) { - for (size_t i=0; isample_size; i++) { - auto idx = bs->alias->get(p->rng); - if (idx == 0) { - buffer_sz++; - } else { - shard_sample_sizes[idx - 1]++; - } - } + std::vector shard_sample_sizes(local_queries.size() + 1, 0); + size_t buffer_sz = 0; - if (bs) { - bs->sample_size = buffer_sz; - } - for (size_t i=0; i *) shard_states[i]; - state->sample_size = shard_sample_sizes[i+1]; - } + /* for simplicity of static structure testing */ + if (!buffer_query) { + assert(local_queries.size() == 1); + local_queries[0]->sample_size = + local_queries[0]->global_parms.sample_size; + return; } - static std::vector> query(S *shard, void *q_state, void *parms) { - auto lower_key = ((Parms *) parms)->lower_bound; - auto upper_key = ((Parms *) parms)->upper_bound; - auto rng = ((Parms *) parms)->rng; - - auto state = (State *) q_state; - auto sample_sz = state->sample_size; - - std::vector> result_set; - - if (sample_sz == 0 || state->lower_bound == shard->get_record_count()) { - return result_set; + /* we only need to build the shard alias on the first call */ + if (buffer_query->alias == nullptr) { + std::vector weights; + if constexpr (REJECTION) { + weights.push_back(buffer_query->cutoff); + } else { + weights.push_back(buffer_query->records.size()); + } + + size_t total_weight = weights[0]; + for (auto &q : local_queries) { + total_weight += q->total_weight; + weights.push_back(q->total_weight); + } + + /* + * if no valid records fall within the query range, + * set all of the sample sizes to 0 and bail out. + */ + if (total_weight == 0) { + for (auto q : local_queries) { + q->sample_size = 0; } - size_t attempts = 0; - size_t range_length = state->upper_bound - state->lower_bound; - do { - attempts++; - size_t idx = (range_length > 0) ? gsl_rng_uniform_int(rng, range_length) : 0; - result_set.emplace_back(*shard->get_record_at(state->lower_bound + idx)); - } while (attempts < sample_sz); + return; + } - return result_set; - } + std::vector normalized_weights; + for (auto w : weights) { + normalized_weights.push_back((double)w / (double)total_weight); + } - static std::vector> buffer_query(void *state, void *parms) { - auto st = (BufferState *) state; - auto p = (Parms *) parms; + buffer_query->alias = std::make_unique(normalized_weights); + } - std::vector> result; - result.reserve(st->sample_size); + for (size_t i = 0; i < parms->sample_size; i++) { + auto idx = buffer_query->alias->get(parms->rng); + if (idx == 0) { + buffer_sz++; + } else { + shard_sample_sizes[idx - 1]++; + } + } - if constexpr (Rejection) { - for (size_t i=0; isample_size; i++) { - auto idx = gsl_rng_uniform_int(p->rng, st->cutoff); - auto rec = st->buffer->get(idx); + if (buffer_query) { + buffer_query->sample_size = buffer_sz; + } - if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { - result.emplace_back(*rec); - } - } + for (size_t i = 0; i < local_queries.size(); i++) { + local_queries[i]->sample_size = shard_sample_sizes[i]; + } + } - return result; - } + static std::vector local_query(S *shard, LocalQuery *query) { + auto sample_sz = query->sample_size; - for (size_t i=0; isample_size; i++) { - auto idx = gsl_rng_uniform_int(p->rng, st->records.size()); - result.emplace_back(st->records[idx]); - } + std::vector result_set; - return result; + if (sample_sz == 0 || query->lower_idx == shard->get_record_count()) { + return result_set; } - static void merge(std::vector>> &results, void *parms, std::vector &output) { - for (size_t i=0; iupper_idx - query->lower_idx; + do { + attempts++; + size_t idx = + (range_length > 0) + ? gsl_rng_uniform_int(query->global_parms.rng, range_length) + : 0; + result_set.emplace_back(*shard->get_record_at(query->lower_idx + idx)); + } while (attempts < sample_sz); + + return result_set; + } + + static std::vector + local_query_buffer(LocalQueryBuffer *query) { + std::vector result; + result.reserve(query->sample_size); + + if constexpr (REJECTION) { + for (size_t i = 0; i < query->sample_size; i++) { + auto idx = gsl_rng_uniform_int(query->global_parms.rng, query->cutoff); + auto rec = query->buffer->get(idx); + + if (rec->rec.key >= query->global_parms.lower_bound && + rec->rec.key <= query->global_parms.upper_bound) { + result.emplace_back(*rec); } - } + } - static void delete_query_state(void *state) { - auto s = (State *) state; - delete s; + return result; } - static void delete_buffer_query_state(void *state) { - auto s = (BufferState *) state; - delete s; + for (size_t i = 0; i < query->sample_size; i++) { + auto idx = + gsl_rng_uniform_int(query->global_parms.rng, query->records.size()); + result.emplace_back(query->records[idx]); } - static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { - auto p = (Parms *) parms; - - if (results.size() < p->sample_size) { - auto q = *p; - q.sample_size -= results.size(); - process_query_states(&q, states, buffer_state); - return true; - } + return result; + } - return false; + static void + combine(std::vector> const &local_results, + Parameters *parms, std::vector &output) { + for (size_t i = 0; i < local_results.size(); i++) { + for (size_t j = 0; j < local_results[i].size(); j++) { + output.emplace_back(local_results[i][j].rec); + } } + } + + static bool repeat(Parameters *parms, std::vector &output, + std::vector const &local_queries, + LocalQueryBuffer *buffer_query) { + if (output.size() < parms->sample_size) { + parms->sample_size -= output.size(); + distribute_query(parms, local_queries, buffer_query); + return true; + } + + return false; + } }; -}} +} // namespace irs +} // namespace de diff --git a/include/query/knn.h b/include/query/knn.h index a227293..87ea10a 100644 --- a/include/query/knn.h +++ b/include/query/knn.h @@ -6,7 +6,7 @@ * Distributed under the Modified BSD License. * * A query class for k-NN queries, designed for use with the VPTree - * shard. + * shard. * * FIXME: no support for tombstone deletes just yet. This would require a * query resumption mechanism, most likely. @@ -16,147 +16,147 @@ #include "framework/QueryRequirements.h" #include "psu-ds/PriorityQueue.h" -namespace de { namespace knn { +namespace de { +namespace knn { using psudb::PriorityQueue; -template -struct Parms { +template class Query { + typedef typename S::RECORD R; + +public: + struct Parameters { R point; size_t k; -}; + }; -template -struct State { - size_t k; -}; + struct LocalQuery { + Parameters global_parms; + }; -template -struct BufferState { + struct LocalQueryBuffer { BufferView *buffer; + Parameters global_parms; + }; - BufferState(BufferView *buffer) - : buffer(buffer) {} -}; + typedef Wrapped LocalResultType; + typedef R ResultType; + constexpr static bool EARLY_ABORT = false; + constexpr static bool SKIP_DELETE_FILTER = true; -template S> -class Query { -public: - constexpr static bool EARLY_ABORT=false; - constexpr static bool SKIP_DELETE_FILTER=true; + static LocalQuery *local_preproc(S *shard, Parameters *parms) { + auto query = new LocalQuery(); + query->global_parms = *parms; - static void *get_query_state(S *shard, void *parms) { - return nullptr; - } + return query; + } - static void* get_buffer_query_state(BufferView *buffer, void *parms) { - return new BufferState(buffer); - } + static LocalQueryBuffer *local_preproc_buffer(BufferView *buffer, + Parameters *parms) { + auto query = new LocalQueryBuffer(); + query->global_parms = *parms; + query->buffer = buffer; - static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_state) { - return; - } + return query; + } - static std::vector> query(S *shard, void *q_state, void *parms) { - std::vector> results; - Parms *p = (Parms *) parms; - Wrapped wrec; - wrec.rec = p->point; - wrec.header = 0; + static void distribute_query(Parameters *parms, + std::vector const &local_queries, + LocalQueryBuffer *buffer_query) { + return; + } - PriorityQueue, DistCmpMax>> pq(p->k, &wrec); + static std::vector local_query(S *shard, LocalQuery *query) { + std::vector results; - shard->search(p->point, p->k, pq); + Wrapped wrec; + wrec.rec = query->global_parms.point; + wrec.header = 0; - while (pq.size() > 0) { - results.emplace_back(*pq.peek().data); - pq.pop(); - } + PriorityQueue, DistCmpMax>> pq(query->global_parms.k, + &wrec); - return results; + shard->search(query->global_parms.point, query->global_parms.k, pq); + + while (pq.size() > 0) { + results.emplace_back(*pq.peek().data); + pq.pop(); } - static std::vector> buffer_query(void *state, void *parms) { - Parms *p = (Parms *) parms; - BufferState *s = (BufferState *) state; - Wrapped wrec; - wrec.rec = p->point; - wrec.header = 0; - - size_t k = p->k; - - PriorityQueue, DistCmpMax>> pq(k, &wrec); - for (size_t i=0; ibuffer->get_record_count(); i++) { - // Skip over deleted records (under tagging) - if (s->buffer->get(i)->is_deleted()) { - continue; - } - - if (pq.size() < k) { - pq.push(s->buffer->get(i)); - } else { - double head_dist = pq.peek().data->rec.calc_distance(wrec.rec); - double cur_dist = (s->buffer->get(i))->rec.calc_distance(wrec.rec); - - if (cur_dist < head_dist) { - pq.pop(); - pq.push(s->buffer->get(i)); - } - } - } + return results; + } - std::vector> results; - while (pq.size() > 0) { - results.emplace_back(*(pq.peek().data)); - pq.pop(); - } + static std::vector + local_query_buffer(LocalQueryBuffer *query) { - return std::move(results); - } + std::vector results; - static std::vector merge(std::vector>> &results, void *parms, std::vector &output) { - Parms *p = (Parms *) parms; - R rec = p->point; - size_t k = p->k; - - PriorityQueue> pq(k, &rec); - for (size_t i=0; icalc_distance(rec); - double cur_dist = results[i][j].rec.calc_distance(rec); - - if (cur_dist < head_dist) { - pq.pop(); - pq.push(&results[i][j].rec); - } - } - } - } + Wrapped wrec; + wrec.rec = query->global_parms.point; + wrec.header = 0; - while (pq.size() > 0) { - output.emplace_back(*pq.peek().data); - pq.pop(); - } + PriorityQueue, DistCmpMax>> pq(query->global_parms.k, + &wrec); + + for (size_t i = 0; i < query->buffer->get_record_count(); i++) { + // Skip over deleted records (under tagging) + if (query->buffer->get(i)->is_deleted()) { + continue; + } - return std::move(output); + if (pq.size() < query->global_parms.k) { + pq.push(query->buffer->get(i)); + } else { + double head_dist = pq.peek().data->rec.calc_distance(wrec.rec); + double cur_dist = (query->buffer->get(i))->rec.calc_distance(wrec.rec); + + if (cur_dist < head_dist) { + pq.pop(); + pq.push(query->buffer->get(i)); + } + } } - static void delete_query_state(void *state) { - auto s = (State *) state; - delete s; + while (pq.size() > 0) { + results.emplace_back(*(pq.peek().data)); + pq.pop(); } - static void delete_buffer_query_state(void *state) { - auto s = (BufferState *) state; - delete s; + return std::move(results); + } + + static void + combine(std::vector> const &local_results, + Parameters *parms, std::vector &output) { + + PriorityQueue> pq(parms->k, &(parms->point)); + for (size_t i = 0; i < local_results.size(); i++) { + for (size_t j = 0; j < local_results[i].size(); j++) { + if (pq.size() < parms->k) { + pq.push(&local_results[i][j].rec); + } else { + double head_dist = pq.peek().data->calc_distance(parms->point); + double cur_dist = local_results[i][j].rec.calc_distance(parms->point); + + if (cur_dist < head_dist) { + pq.pop(); + pq.push(&local_results[i][j].rec); + } + } + } } - static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { - return false; + while (pq.size() > 0) { + output.emplace_back(*pq.peek().data); + pq.pop(); } -}; + } -}} + static bool repeat(Parameters *parms, std::vector &output, + std::vector const &local_queries, + LocalQueryBuffer *buffer_query) { + return false; + } +}; +} // namespace knn +} // namespace de diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h index 94c2bce..f3788de 100644 --- a/include/query/pointlookup.h +++ b/include/query/pointlookup.h @@ -18,106 +18,102 @@ #include "framework/QueryRequirements.h" -namespace de { namespace pl { +namespace de { +namespace pl { -template -struct Parms { - decltype(R::key) search_key; -}; +template class Query { + typedef typename S::RECORD R; -template -struct State { -}; - -template -struct BufferState { - BufferView *buffer; - - BufferState(BufferView *buffer) - : buffer(buffer) {} -}; - -template S> -class Query { public: - constexpr static bool EARLY_ABORT=true; - constexpr static bool SKIP_DELETE_FILTER=true; - - static void *get_query_state(S *shard, void *parms) { - return nullptr; - } - - static void* get_buffer_query_state(BufferView *buffer, void *parms) { - auto res = new BufferState(buffer); + struct Parameters { + decltype(R::key) search_key; + }; - return res; - } + struct LocalQuery { + Parameters global_parms; + }; - static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_state) { - return; + struct LocalQueryBuffer { + BufferView *buffer; + Parameters global_parms; + }; + + typedef Wrapped LocalResultType; + typedef R ResultType; + + constexpr static bool EARLY_ABORT = true; + constexpr static bool SKIP_DELETE_FILTER = true; + + static LocalQuery *local_preproc(S *shard, Parameters *parms) { + auto query = new LocalQuery(); + query->global_parms = *parms; + return query; + } + + static LocalQueryBuffer *local_preproc_buffer(BufferView *buffer, + Parameters *parms) { + auto query = new LocalQueryBuffer(); + query->buffer = buffer; + query->global_parms = *parms; + + return query; + } + + static void distribute_query(Parameters *parms, + std::vector const &local_queries, + LocalQueryBuffer *buffer_query) { + return; + } + + static std::vector local_query(S *shard, LocalQuery *query) { + std::vector result; + + auto r = shard->point_lookup({query->global_parms.search_key, 0}); + + if (r) { + result.push_back(*r); } - static std::vector> query(S *shard, void *q_state, void *parms) { - auto p = (Parms *) parms; - auto s = (State *) q_state; - - std::vector> result; - - auto r = shard->point_lookup({p->search_key, 0}); + return result; + } + + static std::vector + local_query_buffer(LocalQueryBuffer *query) { + std::vector result; - if (r) { - result.push_back(*r); - } + for (size_t i = 0; i < query->buffer->get_record_count(); i++) { + auto rec = query->buffer->get(i); + if (rec->rec.key == query->global_parms.search_key) { + result.push_back(*rec); return result; + } } - static std::vector> buffer_query(void *state, void *parms) { - auto p = (Parms *) parms; - auto s = (BufferState *) state; - - std::vector> records; - for (size_t i=0; ibuffer->get_record_count(); i++) { - auto rec = s->buffer->get(i); - - if (rec->rec.key == p->search_key) { - records.push_back(*rec); - return records; - } + return result; + } + + + static void + combine(std::vector> const &local_results, + Parameters *parms, std::vector &output) { + for (auto r : local_results) { + if (r.size() > 0) { + if (r[0].is_deleted() || r[0].is_tombstone()) { + return; } - return records; - } - - static std::vector merge(std::vector>> &results, void *parms, std::vector &output) { - for (auto r : results) { - if (r.size() > 0) { - if (r[0].is_deleted() || r[0].is_tombstone()) { - return output; - } - - output.push_back(r[0].rec); - return output; - } - } - - return output; - } - - static void delete_query_state(void *state) { - auto s = (State *) state; - delete s; - } - - static void delete_buffer_query_state(void *state) { - auto s = (BufferState *) state; - delete s; - } - - - static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { - return false; + output.push_back(r[0].rec); + return; + } } + } + + static bool repeat(Parameters *parms, std::vector &output, + std::vector const &local_queries, + LocalQueryBuffer *buffer_query) { + return false; + } }; - -}} +} // namespace pl +} // namespace de diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 5b95cdd..68d304d 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -5,169 +5,168 @@ * * Distributed under the Modified BSD License. * - * A query class for single dimensional range count queries. This query - * requires that the shard support get_lower_bound(key) and + * A query class for single dimensional range count queries. This query + * requires that the shard support get_lower_bound(key) and * get_record_at(index). */ #pragma once #include "framework/QueryRequirements.h" -namespace de { namespace rc { +namespace de { +namespace rc { -template -struct Parms { +template class Query { + typedef typename S::RECORD R; + +public: + struct Parameters { decltype(R::key) lower_bound; decltype(R::key) upper_bound; -}; + }; -template -struct State { + struct LocalQuery { size_t start_idx; size_t stop_idx; -}; + Parameters global_parms; + }; -template -struct BufferState { + struct LocalQueryBuffer { BufferView *buffer; - - BufferState(BufferView *buffer) - : buffer(buffer) {} -}; - -template S, bool FORCE_SCAN=false> -class Query { -public: - constexpr static bool EARLY_ABORT=false; - constexpr static bool SKIP_DELETE_FILTER=true; - - static void *get_query_state(S *shard, void *parms) { - return nullptr; - } - - static void* get_buffer_query_state(BufferView *buffer, void *parms) { - auto res = new BufferState(buffer); - - return res; + Parameters global_parms; + }; + + struct LocalResultType { + size_t record_count; + size_t tombstone_count; + + bool is_deleted() {return false;} + bool is_tombstone() {return false;} + }; + + typedef size_t ResultType; + constexpr static bool EARLY_ABORT = false; + constexpr static bool SKIP_DELETE_FILTER = true; + + static LocalQuery *local_preproc(S *shard, Parameters *parms) { + auto query = new LocalQuery(); + + query->start_idx = shard->get_lower_bound(parms->lower_bound); + query->stop_idx = shard->get_record_count(); + query->global_parms.lower_bound = parms->lower_bound; + query->global_parms.upper_bound = parms->upper_bound; + + return query; + } + + static LocalQueryBuffer *local_preproc_buffer(BufferView *buffer, + Parameters *parms) { + auto query = new LocalQueryBuffer(); + query->buffer = buffer; + query->global_parms.lower_bound = parms->lower_bound; + query->global_parms.upper_bound = parms->upper_bound; + + return query; + } + + static void distribute_query(Parameters *parms, + std::vector const &local_queries, + LocalQueryBuffer *buffer_query) { + return; + } + + static std::vector local_query(S *shard, LocalQuery *query) { + std::vector result; + + /* + * if the returned index is one past the end of the + * records for the PGM, then there are not records + * in the index falling into the specified range. + */ + if (query->start_idx == shard->get_record_count()) { + return result; } - static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_state) { - return; + auto ptr = shard->get_record_at(query->start_idx); + size_t reccnt = 0; + size_t tscnt = 0; + + /* + * roll the pointer forward to the first record that is + * greater than or equal to the lower bound. + */ + while (ptr < shard->get_data() + query->stop_idx && + ptr->rec.key < query->global_parms.lower_bound) { + ptr++; } - static std::vector> query(S *shard, void *q_state, void *parms) { - std::vector> records; - auto p = (Parms *) parms; - auto s = (State *) q_state; - - size_t reccnt = 0; - size_t tscnt = 0; - - Wrapped res; - res.rec.key= 0; // records - res.rec.value = 0; // tombstones - records.emplace_back(res); - - - auto start_idx = shard->get_lower_bound(p->lower_bound); - auto stop_idx = shard->get_lower_bound(p->upper_bound); + while (ptr < shard->get_data() + query->stop_idx && + ptr->rec.key <= query->global_parms.upper_bound) { - /* - * if the returned index is one past the end of the - * records for the PGM, then there are not records - * in the index falling into the specified range. - */ - if (start_idx == shard->get_record_count()) { - return records; - } - - - /* - * roll the pointer forward to the first record that is - * greater than or equal to the lower bound. - */ - auto recs = shard->get_data(); - while(start_idx < stop_idx && recs[start_idx].rec.key < p->lower_bound) { - start_idx++; - } - - while (stop_idx < shard->get_record_count() && recs[stop_idx].rec.key <= p->upper_bound) { - stop_idx++; - } - size_t idx = start_idx; - size_t ts_cnt = 0; + if (!ptr->is_deleted()) { + reccnt++; - while (idx < stop_idx) { - ts_cnt += recs[idx].is_tombstone() * 2 + recs[idx].is_deleted(); - idx++; + if (ptr->is_tombstone()) { + tscnt++; } + } - records[0].rec.key = idx - start_idx; - records[0].rec.value = ts_cnt; - - return records; + ptr++; } - static std::vector> buffer_query(void *state, void *parms) { - auto p = (Parms *) parms; - auto s = (BufferState *) state; - - std::vector> records; - - Wrapped res; - res.rec.key= 0; // records - res.rec.value = 0; // tombstones - records.emplace_back(res); - - size_t stop_idx; - if constexpr (FORCE_SCAN) { - stop_idx = s->buffer->get_capacity() / 2; - } else { - stop_idx = s->buffer->get_record_count(); - } - - for (size_t i=0; ibuffer->get_record_count(); i++) { - auto rec = s->buffer->get(i); - - if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound - && !rec->is_deleted()) { - if (rec->is_tombstone()) { - records[0].rec.value++; - } else { - records[0].rec.key++; - } - } + result.push_back({reccnt, tscnt}); + return result; + } + + static std::vector + local_query_buffer(LocalQueryBuffer *query) { + + std::vector result; + size_t reccnt = 0; + size_t tscnt = 0; + for (size_t i = 0; i < query->buffer->get_record_count(); i++) { + auto rec = query->buffer->get(i); + if (rec->rec.key >= query->global_parms.lower_bound && + rec->rec.key <= query->global_parms.upper_bound) { + if (!rec->is_deleted()) { + reccnt++; + if (rec->is_tombstone()) { + tscnt++; + } } - - return records; + } } - static std::vector merge(std::vector>> &results, void *parms, std::vector &output) { - R res; - res.key = 0; - res.value = 0; - output.emplace_back(res); + result.push_back({reccnt, tscnt}); - for (size_t i=0; i> const &local_results, + Parameters *parms, std::vector &output) { + size_t reccnt = 0; + size_t tscnt = 0; - static void delete_query_state(void *state) { + for (auto &local_result : local_results) { + reccnt += local_result[0].record_count; + tscnt += local_result[0].tombstone_count; } - static void delete_buffer_query_state(void *state) { - auto s = (BufferState *) state; - delete s; + /* if more tombstones than results, clamp the output at 0 */ + if (tscnt > reccnt) { + tscnt = reccnt; } - static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { - return false; - } + output.push_back({reccnt - tscnt}); + } + + static bool repeat(Parameters *parms, std::vector &output, + std::vector const &local_queries, + LocalQueryBuffer *buffer_query) { + return false; + } }; -}} +} // namespace rc +} // namespace de diff --git a/include/query/rangequery.h b/include/query/rangequery.h index e0690e6..e7be39c 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -1,177 +1,186 @@ /* * include/query/rangequery.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * - * A query class for single dimensional range queries. This query requires + * A query class for single dimensional range queries. This query requires * that the shard support get_lower_bound(key) and get_record_at(index). */ #pragma once #include "framework/QueryRequirements.h" +#include "framework/interface/Record.h" #include "psu-ds/PriorityQueue.h" #include "util/Cursor.h" -namespace de { namespace rq { +namespace de { +namespace rq { -template -struct Parms { +template class Query { + typedef typename S::RECORD R; + +public: + struct Parameters { decltype(R::key) lower_bound; decltype(R::key) upper_bound; -}; + }; -template -struct State { + struct LocalQuery { size_t start_idx; size_t stop_idx; -}; + Parameters global_parms; + }; -template -struct BufferState { + struct LocalQueryBuffer { BufferView *buffer; - - BufferState(BufferView *buffer) - : buffer(buffer) {} -}; - -template S> -class Query { -public: - constexpr static bool EARLY_ABORT=false; - constexpr static bool SKIP_DELETE_FILTER=true; - - static void *get_query_state(S *shard, void *parms) { - auto res = new State(); - auto p = (Parms *) parms; - - res->start_idx = shard->get_lower_bound(p->lower_bound); - res->stop_idx = shard->get_record_count(); - - return res; + Parameters global_parms; + }; + + typedef Wrapped LocalResultType; + typedef R ResultType; + + constexpr static bool EARLY_ABORT = false; + constexpr static bool SKIP_DELETE_FILTER = true; + + static LocalQuery *local_preproc(S *shard, Parameters *parms) { + auto query = new LocalQuery(); + + query->start_idx = shard->get_lower_bound(parms->lower_bound); + query->stop_idx = shard->get_record_count(); + query->global_parms = *parms; + + return query; + } + + static LocalQueryBuffer *local_preproc_buffer(BufferView *buffer, + Parameters *parms) { + auto query = new LocalQueryBuffer(); + query->buffer = buffer; + query->global_parms = *parms; + + return query; + } + + static void distribute_query(Parameters *parms, + std::vector const &local_queries, + LocalQueryBuffer *buffer_query) { + return; + } + + static std::vector local_query(S *shard, LocalQuery *query) { + std::vector result; + + /* + * if the returned index is one past the end of the + * records for the PGM, then there are not records + * in the index falling into the specified range. + */ + if (query->start_idx == shard->get_record_count()) { + return result; } - static void* get_buffer_query_state(BufferView *buffer, void *parms) { - auto res = new BufferState(buffer); + auto ptr = shard->get_record_at(query->start_idx); - return res; + /* + * roll the pointer forward to the first record that is + * greater than or equal to the lower bound. + */ + while (ptr < shard->get_data() + query->stop_idx && + ptr->rec.key < query->global_parms.lower_bound) { + ptr++; } - static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_state) { - return; + while (ptr < shard->get_data() + query->stop_idx && + ptr->rec.key <= query->global_parms.upper_bound) { + result.emplace_back(*ptr); + ptr++; } - static std::vector> query(S *shard, void *q_state, void *parms) { - std::vector> records; - auto p = (Parms *) parms; - auto s = (State *) q_state; - - /* - * if the returned index is one past the end of the - * records for the PGM, then there are not records - * in the index falling into the specified range. - */ - if (s->start_idx == shard->get_record_count()) { - return records; - } - - auto ptr = shard->get_record_at(s->start_idx); - - /* - * roll the pointer forward to the first record that is - * greater than or equal to the lower bound. - */ - while(ptr < shard->get_data() + s->stop_idx && ptr->rec.key < p->lower_bound) { - ptr++; - } - - while (ptr < shard->get_data() + s->stop_idx && ptr->rec.key <= p->upper_bound) { - records.emplace_back(*ptr); - ptr++; - } - - return records; - } + return result; + } - static std::vector> buffer_query(void *state, void *parms) { - auto p = (Parms *) parms; - auto s = (BufferState *) state; + static std::vector + local_query_buffer(LocalQueryBuffer *query) { - std::vector> records; - for (size_t i=0; ibuffer->get_record_count(); i++) { - auto rec = s->buffer->get(i); - if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { - records.emplace_back(*rec); - } - } - - return records; + std::vector result; + for (size_t i = 0; i < query->buffer->get_record_count(); i++) { + auto rec = query->buffer->get(i); + if (rec->rec.key >= query->global_parms.lower_bound && + rec->rec.key <= query->global_parms.upper_bound) { + result.emplace_back(*rec); + } } - static std::vector merge(std::vector>> &results, void *parms, std::vector &output) { - std::vector>> cursors; - cursors.reserve(results.size()); - - psudb::PriorityQueue> pq(results.size()); - size_t total = 0; - size_t tmp_n = results.size(); - - - for (size_t i = 0; i < tmp_n; ++i) - if (results[i].size() > 0){ - auto base = results[i].data(); - cursors.emplace_back(Cursor>{base, base + results[i].size(), 0, results[i].size()}); - assert(i == cursors.size() - 1); - total += results[i].size(); - pq.push(cursors[i].ptr, tmp_n - i - 1); - } else { - cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); - } - - if (total == 0) { - return std::vector(); - } - - output.reserve(total); - - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : psudb::queue_record>{nullptr, 0}; - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[tmp_n - now.version - 1]; - auto& cursor2 = cursors[tmp_n - next.version - 1]; - if (advance_cursor>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor>(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[tmp_n - now.version - 1]; - if (!now.data->is_tombstone()) output.push_back(cursor.ptr->rec); - - pq.pop(); - - if (advance_cursor>(cursor)) pq.push(cursor.ptr, now.version); - } - } - - return output; + return result; + } + + static void + combine(std::vector> const &local_results, + Parameters *parms, std::vector &output) { + std::vector> cursors; + cursors.reserve(local_results.size()); + + psudb::PriorityQueue pq(local_results.size()); + size_t total = 0; + size_t tmp_n = local_results.size(); + + for (size_t i = 0; i < tmp_n; ++i) + if (local_results[i].size() > 0) { + auto base = local_results[i].data(); + cursors.emplace_back(Cursor{ + base, base + local_results[i].size(), 0, local_results[i].size()}); + assert(i == cursors.size() - 1); + total += local_results[i].size(); + pq.push(cursors[i].ptr, tmp_n - i - 1); + } else { + cursors.emplace_back(Cursor{nullptr, nullptr, 0, 0}); + } + + if (total == 0) { + return; } - static void delete_query_state(void *state) { - auto s = (State *) state; - delete s; + output.reserve(total); + + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 + ? pq.peek(1) + : psudb::queue_record{nullptr, 0}; + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[tmp_n - now.version - 1]; + auto &cursor2 = cursors[tmp_n - next.version - 1]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[tmp_n - now.version - 1]; + if (!now.data->is_tombstone()) + output.push_back(cursor.ptr->rec); + + pq.pop(); + + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); + } } - static void delete_buffer_query_state(void *state) { - auto s = (BufferState *) state; - delete s; - } + return; + } - static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { - return false; - } + static bool repeat(Parameters *parms, std::vector &output, + std::vector const &local_queries, + LocalQueryBuffer *buffer_query) { + return false; + } }; -}} +} // namespace rq +} // namespace de diff --git a/include/query/wirs.h b/include/query/wirs.h deleted file mode 100644 index 62b43f6..0000000 --- a/include/query/wirs.h +++ /dev/null @@ -1,251 +0,0 @@ -/* - * include/query/wirs.h - * - * Copyright (C) 2023 Douglas B. Rumbaugh - * - * Distributed under the Modified BSD License. - * - * A query class for weighted independent range sampling. This - * class is tightly coupled with include/shard/AugBTree.h, and - * so is probably of limited general utility. - */ -#pragma once - -#include "framework/QueryRequirements.h" -#include "psu-ds/Alias.h" - -namespace de { namespace wirs { - -template -struct Parms { - decltype(R::key) lower_bound; - decltype(R::key) upper_bound; - size_t sample_size; - gsl_rng *rng; -}; - -template -struct State { - decltype(R::weight) total_weight; - std::vector nodes; - psudb::Alias* top_level_alias; - size_t sample_size; - - State() { - total_weight = 0; - top_level_alias = nullptr; - } - - ~State() { - if (top_level_alias) delete top_level_alias; - } -}; - -template -struct BufferState { - size_t cutoff; - psudb::Alias* alias; - std::vector> records; - decltype(R::weight) max_weight; - size_t sample_size; - decltype(R::weight) total_weight; - BufferView *buffer; - - ~BufferState() { - delete alias; - } -}; - -template S, bool Rejection=true> -class Query { -public: - constexpr static bool EARLY_ABORT=false; - constexpr static bool SKIP_DELETE_FILTER=false; - - static void *get_query_state(S *shard, void *parms) { - auto res = new State(); - decltype(R::key) lower_key = ((Parms *) parms)->lower_bound; - decltype(R::key) upper_key = ((Parms *) parms)->upper_bound; - - std::vector weights; - res->total_weight = shard->find_covering_nodes(lower_key, upper_key, res->nodes, weights); - - std::vector normalized_weights; - for (auto weight : weights) { - normalized_weights.emplace_back(weight / res->total_weight); - } - - res->top_level_alias = new psudb::Alias(normalized_weights); - res->sample_size = 0; - - return res; - } - - static void* get_buffer_query_state(BufferView *buffer, void *parms) { - BufferState *state = new BufferState(); - auto parameters = (Parms*) parms; - - if constexpr (Rejection) { - state->cutoff = buffer->get_record_count() - 1; - state->max_weight = buffer->get_max_weight(); - state->total_weight = buffer->get_total_weight(); - state->sample_size = 0; - state->buffer = buffer; - return state; - } - - std::vector weights; - - state->buffer = buffer; - decltype(R::weight) total_weight = 0; - - for (size_t i = 0; i <= buffer->get_record_count(); i++) { - auto rec = buffer->get(i); - - if (rec->rec.key >= parameters->lower_bound && rec->rec.key <= parameters->upper_bound && !rec->is_tombstone() && !rec->is_deleted()) { - weights.push_back(rec->rec.weight); - state->records.push_back(*rec); - total_weight += rec->rec.weight; - } - } - - std::vector normalized_weights; - for (size_t i = 0; i < weights.size(); i++) { - normalized_weights.push_back(weights[i] / total_weight); - } - - state->total_weight = total_weight; - state->alias = new psudb::Alias(normalized_weights); - state->sample_size = 0; - - return state; - } - - static void process_query_states(void *query_parms, std::vector &shard_states, std::vector &buffer_states) { - auto p = (Parms *) query_parms; - - std::vector shard_sample_sizes(shard_states.size()+buffer_states.size(), 0); - size_t buffer_sz = 0; - - std::vector weights; - - decltype(R::weight) total_weight = 0; - for (auto &s : buffer_states) { - auto bs = (BufferState *) s; - total_weight += bs->total_weight; - weights.push_back(bs->total_weight); - } - - for (auto &s : shard_states) { - auto state = (State *) s; - total_weight += state->total_weight; - weights.push_back(state->total_weight); - } - - std::vector normalized_weights; - for (auto w : weights) { - normalized_weights.push_back((double) w / (double) total_weight); - } - - auto shard_alias = psudb::Alias(normalized_weights); - for (size_t i=0; isample_size; i++) { - auto idx = shard_alias.get(p->rng); - - if (idx < buffer_states.size()) { - auto state = (BufferState *) buffer_states[idx]; - state->sample_size++; - } else { - auto state = (State *) shard_states[idx - buffer_states.size()]; - state->sample_size++; - } - } - } - - static std::vector> query(S *shard, void *q_state, void *parms) { - auto lower_key = ((Parms *) parms)->lower_bound; - auto upper_key = ((Parms *) parms)->upper_bound; - auto rng = ((Parms *) parms)->rng; - - auto state = (State *) q_state; - auto sample_size = state->sample_size; - - std::vector> result_set; - - if (sample_size == 0) { - return result_set; - } - size_t cnt = 0; - size_t attempts = 0; - - for (size_t i=0; iget_weighted_sample(lower_key, upper_key, - state->nodes[state->top_level_alias->get(rng)], - rng); - if (rec) { - result_set.emplace_back(*rec); - } - } - - return result_set; - } - - static std::vector> buffer_query(void *state, void *parms) { - auto st = (BufferState *) state; - auto p = (Parms *) parms; - auto buffer = st->buffer; - - std::vector> result; - result.reserve(st->sample_size); - - if constexpr (Rejection) { - for (size_t i=0; isample_size; i++) { - auto idx = gsl_rng_uniform_int(p->rng, st->cutoff); - auto rec = buffer->get(idx); - - auto test = gsl_rng_uniform(p->rng) * st->max_weight; - - if (test <= rec->rec.weight && rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { - result.emplace_back(*rec); - } - } - return result; - } - - for (size_t i=0; isample_size; i++) { - auto idx = st->alias->get(p->rng); - result.emplace_back(st->records[idx]); - } - - return result; - } - - static std::vector merge(std::vector>> &results, void *parms, std::vector &output) { - for (size_t i=0; i *) state; - delete s; - } - - static void delete_buffer_query_state(void *state) { - auto s = (BufferState *) state; - delete s; - } - - static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { - auto p = (Parms *) parms; - - if (results.size() < p->sample_size) { - return true; - } - return false; - } -}; -}} diff --git a/include/query/wss.h b/include/query/wss.h index fb0b414..54620ca 100644 --- a/include/query/wss.h +++ b/include/query/wss.h @@ -6,7 +6,7 @@ * Distributed under the Modified BSD License. * * A query class for weighted set sampling. This - * class is tightly coupled with include/shard/Alias.h, + * class is tightly coupled with include/shard/Alias.h, * and so is probably of limited general utility. */ #pragma once @@ -14,203 +14,177 @@ #include "framework/QueryRequirements.h" #include "psu-ds/Alias.h" -namespace de { namespace wss { +namespace de { +namespace wss { -template -struct Parms { +template class Query { + typedef typename S::RECORD R; + +public: + struct Parameters { size_t sample_size; gsl_rng *rng; -}; + }; -template -struct State { - decltype(R::weight) total_weight; + struct LocalQuery { size_t sample_size; + decltype(R::weight) total_weight; - State() { - total_weight = 0; - } -}; + Parameters global_parms; + }; + + struct LocalQueryBuffer { + BufferView *buffer; -template -struct BufferState { - size_t cutoff; size_t sample_size; - psudb::Alias *alias; - decltype(R::weight) max_weight; decltype(R::weight) total_weight; - BufferView *buffer; + decltype(R::weight) max_weight; + size_t cutoff; - ~BufferState() { - delete alias; - } -}; + std::unique_ptr alias; -template S, bool Rejection=true> -class Query { -public: - constexpr static bool EARLY_ABORT=false; - constexpr static bool SKIP_DELETE_FILTER=false; + Parameters global_parms; + }; - static void *get_query_state(S *shard, void *parms) { - auto res = new State(); - res->total_weight = shard->get_total_weight(); - res->sample_size = 0; + constexpr static bool EARLY_ABORT = false; + constexpr static bool SKIP_DELETE_FILTER = false; - return res; - } + typedef Wrapped LocalResultType; + typedef R ResultType; - static void* get_buffer_query_state(BufferView *buffer, void *parms) { - BufferState *state = new BufferState(); - auto parameters = (Parms*) parms; - if constexpr (Rejection) { - state->cutoff = buffer->get_record_count() - 1; - state->max_weight = buffer->get_max_weight(); - state->total_weight = buffer->get_total_weight(); - state->buffer = buffer; - return state; - } + static LocalQuery *local_preproc(S *shard, Parameters *parms) { + auto query = new LocalQuery(); - std::vector weights; + query->global_parms = *parms; + query->total_weight = shard->get_total_weight(); + query->sample_size = 0; - double total_weight = 0.0; - state->buffer = buffer; + return query; + } - for (size_t i = 0; i <= buffer->get_record_count(); i++) { - auto rec = buffer->get_data(i); - weights.push_back(rec->rec.weight); - total_weight += rec->rec.weight; - } + static LocalQueryBuffer *local_preproc_buffer(BufferView *buffer, + Parameters *parms) { + auto query = new LocalQueryBuffer(); - for (size_t i = 0; i < weights.size(); i++) { - weights[i] = weights[i] / total_weight; - } + query->cutoff = buffer->get_record_count() - 1; - state->alias = new psudb::Alias(weights); - state->total_weight = total_weight; + query->max_weight = 0; + query->total_weight = 0; - return state; - } + for (size_t i = 0; i < buffer->get_record_count(); i++) { + auto weight = buffer->get(i)->rec.weight; + query->total_weight += weight; - static void process_query_states(void *query_parms, std::vector &shard_states, std::vector &buffer_states) { - auto p = (Parms *) query_parms; - - std::vector shard_sample_sizes(shard_states.size()+buffer_states.size(), 0); - size_t buffer_sz = 0; - - std::vector weights; - - decltype(R::weight) total_weight = 0; - for (auto &s : buffer_states) { - auto bs = (BufferState *) s; - total_weight += bs->total_weight; - weights.push_back(bs->total_weight); - } - - for (auto &s : shard_states) { - auto state = (State *) s; - total_weight += state->total_weight; - weights.push_back(state->total_weight); - } - - std::vector normalized_weights; - for (auto w : weights) { - normalized_weights.push_back((double) w / (double) total_weight); - } - - auto shard_alias = psudb::Alias(normalized_weights); - for (size_t i=0; isample_size; i++) { - auto idx = shard_alias.get(p->rng); - - if (idx < buffer_states.size()) { - auto state = (BufferState *) buffer_states[idx]; - state->sample_size++; - } else { - auto state = (State *) shard_states[idx - buffer_states.size()]; - state->sample_size++; - } - } + if (weight > query->max_weight) { + query->max_weight = weight; + } } - static std::vector> query(S *shard, void *q_state, void *parms) { - auto rng = ((Parms *) parms)->rng; + query->buffer = buffer; + query->global_parms = *parms; - auto state = (State *) q_state; - auto sample_size = state->sample_size; + query->alias = nullptr; - std::vector> result_set; + return query; + } - if (sample_size == 0) { - return result_set; - } - size_t attempts = 0; - do { - attempts++; - size_t idx = shard->get_weighted_sample(rng); - result_set.emplace_back(*shard->get_record_at(idx)); - } while (attempts < sample_size); + static void distribute_query(Parameters *parms, + std::vector const &local_queries, + LocalQueryBuffer *buffer_query) { - return result_set; + if (!buffer_query) { + assert(local_queries.size() == 1); + local_queries[0]->sample_size = + local_queries[0]->global_parms.sample_size; + return; } - static std::vector> buffer_query(void *state, void *parms) { - auto st = (BufferState *) state; - auto p = (Parms *) parms; - auto buffer = st->buffer; + if (!buffer_query->alias) { + std::vector weights; - std::vector> result; - result.reserve(st->sample_size); + decltype(R::weight) total_weight = buffer_query->total_weight; + weights.push_back(total_weight); - if constexpr (Rejection) { - for (size_t i=0; isample_size; i++) { - auto idx = gsl_rng_uniform_int(p->rng, st->cutoff); - auto rec = buffer->get(idx); + for (auto &q : local_queries) { + total_weight += q->total_weight; + weights.push_back(q->total_weight); + q->sample_size = 0; + } - auto test = gsl_rng_uniform(p->rng) * st->max_weight; + std::vector normalized_weights; + for (auto w : weights) { + normalized_weights.push_back((double)w / (double)total_weight); + } - if (test <= rec->rec.weight) { - result.emplace_back(*rec); - } - } - return result; - } + buffer_query->alias = std::make_unique(normalized_weights); + } - for (size_t i=0; isample_size; i++) { - auto idx = st->alias->get(p->rng); - result.emplace_back(*(buffer->get_data() + idx)); - } + for (size_t i = 0; i < parms->sample_size; i++) { + auto idx = buffer_query->alias->get(parms->rng); - return result; + if (idx == 0) { + buffer_query->sample_size++; + } else { + local_queries[idx - 1]->sample_size++; + } } + } - static std::vector merge(std::vector>> &results, void *parms, std::vector &output) { - for (size_t i=0; i local_query(S *shard, LocalQuery *query) { + std::vector result; - return output; + if (query->sample_size == 0) { + return result; } - static void delete_query_state(void *state) { - auto s = (State *) state; - delete s; + for (size_t i = 0; i < query->sample_size; i++) { + size_t idx = shard->get_weighted_sample(query->global_parms.rng); + if (!shard->get_record_at(idx)->is_deleted()) { + result.emplace_back(*shard->get_record_at(idx)); + } } - static void delete_buffer_query_state(void *state) { - auto s = (BufferState *) state; - delete s; + return result; + } + + static std::vector + local_query_buffer(LocalQueryBuffer *query) { + std::vector result; + + for (size_t i = 0; i < query->sample_size; i++) { + auto idx = gsl_rng_uniform_int(query->global_parms.rng, query->cutoff); + auto rec = query->buffer->get(idx); + + auto test = gsl_rng_uniform(query->global_parms.rng) * query->max_weight; + if (test <= rec->rec.weight && !rec->is_deleted()) { + result.emplace_back(*rec); + } } - static bool repeat(void *parms, std::vector &results, std::vector states, void* buffer_state) { - auto p = (Parms *) parms; + return result; + } - if (results.size() < p->sample_size) { - return true; - } - return false; + static void + combine(std::vector> const &local_results, + Parameters *parms, std::vector &output) { + for (size_t i = 0; i < local_results.size(); i++) { + for (size_t j = 0; j < local_results[i].size(); j++) { + output.emplace_back(local_results[i][j].rec); + } + } + } + + static bool repeat(Parameters *parms, std::vector &output, + std::vector const &local_queries, + LocalQueryBuffer *buffer_query) { + if (output.size() < parms->sample_size) { + parms->sample_size -= output.size(); + distribute_query(parms, local_queries, buffer_query); + return true; } -}; -}} + return false; + } +}; +} // namespace wss +} // namespace de diff --git a/include/shard/Alias.h b/include/shard/Alias.h index 72147d7..8fe70a5 100644 --- a/include/shard/Alias.h +++ b/include/shard/Alias.h @@ -25,21 +25,20 @@ using psudb::CACHELINE_SIZE; using psudb::BloomFilter; -using psudb::PriorityQueue; -using psudb::queue_record; using psudb::byte; namespace de { -static thread_local size_t wss_cancelations = 0; - template class Alias { +public: + typedef R RECORD; private: typedef decltype(R::key) K; typedef decltype(R::value) V; typedef decltype(R::weight) W; + public: Alias(BufferView buffer) : m_data(nullptr) @@ -71,7 +70,7 @@ public: } } - Alias(std::vector &shards) + Alias(std::vector const &shards) : m_data(nullptr) , m_alias(nullptr) , m_total_weight(0) @@ -167,7 +166,6 @@ public: size_t min = 0; size_t max = m_reccnt - 1; - const char * record_key; while (min < max) { size_t mid = (min + max) / 2; diff --git a/include/shard/AugBTree.h b/include/shard/AugBTree.h deleted file mode 100644 index c60cbcd..0000000 --- a/include/shard/AugBTree.h +++ /dev/null @@ -1,311 +0,0 @@ -/* - * include/shard/AugBTree.h - * - * Copyright (C) 2023 Dong Xie - * Douglas B. Rumbaugh - * - * Distributed under the Modified BSD License. - * - * A shard shim around the alias augmented B-tree. Designed to be - * used along side the WIRS query in include/query/wirs.h, but - * also supports the necessary methods for other common query - * types. - * - * TODO: The code in this file is very poorly commented. - */ -#pragma once - - -#include -#include - -#include "framework/ShardRequirements.h" - -#include "psu-ds/Alias.h" -#include "psu-ds/BloomFilter.h" -#include "util/bf_config.h" -#include "util/SortedMerge.h" - -using psudb::CACHELINE_SIZE; -using psudb::BloomFilter; -using psudb::Alias; -using psudb::byte; - -namespace de { - -template -struct AugBTreeNode { - struct AugBTreeNode *left, *right; - decltype(R::key) low, high; - decltype(R::weight) weight; - Alias* alias; -}; - -template -class AugBTree { -private: - typedef decltype(R::key) K; - typedef decltype(R::value) V; - typedef decltype(R::weight) W; - -public: - AugBTree(BufferView buffer) - : m_data(nullptr) - , m_root(nullptr) - , m_reccnt(0) - , m_tombstone_cnt(0) - , m_group_size(0) - , m_alloc_size(0) - , m_node_cnt(0) - , m_bf(new BloomFilter(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) - { - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - buffer.get_record_count() * - sizeof(Wrapped), - (byte**) &m_data); - - auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; - - if (m_reccnt > 0) { - build_wirs_structure(); - } - } - - AugBTree(std::vector shards) - : m_data(nullptr) - , m_root(nullptr) - , m_reccnt(0) - , m_tombstone_cnt(0) - , m_group_size(0) - , m_alloc_size(0) - , m_node_cnt(0) - , m_bf(nullptr) - { - size_t attemp_reccnt = 0; - size_t tombstone_count = 0; - auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - - m_bf = new BloomFilter(BF_FPR, tombstone_count, BF_HASH_FUNCS); - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - attemp_reccnt * sizeof(Wrapped), - (byte **) &m_data); - - auto res = sorted_array_merge(cursors, m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; - - if (m_reccnt > 0) { - build_wirs_structure(); - } - } - - ~AugBTree() { - free(m_data); - for (size_t i=0; i *point_lookup(const R &rec, bool filter=false) { - if (filter && !m_bf->lookup(rec)) { - return nullptr; - } - - size_t idx = get_lower_bound(rec.key); - if (idx >= m_reccnt) { - return nullptr; - } - - while (idx < (m_reccnt-1) && m_data[idx].rec < rec) ++idx; - - if (m_data[idx].rec == rec) { - return m_data + idx; - } - - return nullptr; - } - - Wrapped* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; - } - - size_t get_tombstone_count() const { - return m_tombstone_cnt; - } - - const Wrapped* get_record_at(size_t idx) const { - if (idx >= m_reccnt) return nullptr; - return m_data + idx; - } - - size_t get_memory_usage() { - return m_node_cnt * sizeof(AugBTreeNode>); - } - - size_t get_aux_memory_usage() { - return (m_bf) ? m_bf->memory_usage() : 0; - } - - size_t get_lower_bound(const K& key) const { - size_t min = 0; - size_t max = m_reccnt - 1; - - const char * record_key; - while (min < max) { - size_t mid = (min + max) / 2; - - if (key > m_data[mid].rec.key) { - min = mid + 1; - } else { - max = mid; - } - } - - return min; - } - - W find_covering_nodes(K lower_key, K upper_key, std::vector &nodes, std::vector &weights) { - W total_weight = 0; - - /* Simulate a stack to unfold recursion. */ - struct AugBTreeNode* st[64] = {0}; - st[0] = m_root; - size_t top = 1; - while(top > 0) { - auto now = st[--top]; - if (covered_by(now, lower_key, upper_key) || - (now->left == nullptr && now->right == nullptr && intersects(now, lower_key, upper_key))) { - nodes.emplace_back(now); - weights.emplace_back(now->weight); - total_weight += now->weight; - } else { - if (now->left && intersects(now->left, lower_key, upper_key)) st[top++] = now->left; - if (now->right && intersects(now->right, lower_key, upper_key)) st[top++] = now->right; - } - } - - - return total_weight; - } - - Wrapped *get_weighted_sample(K lower_key, K upper_key, void *internal_node, gsl_rng *rng) { - /* k -> sampling: three levels. 1. select a node -> select a fat point -> select a record. */ - - /* first level */ - auto node = (AugBTreeNode*) internal_node; - - /* second level */ - auto fat_point = node->low + node->alias->get(rng); - - /* third level */ - size_t rec_offset = fat_point * m_group_size + m_alias[fat_point]->get(rng); - auto record = m_data + rec_offset; - - /* bounds rejection */ - if (lower_key > record->rec.key || upper_key < record->rec.key) { - return nullptr; - } - - return record; - } - -private: - - bool covered_by(struct AugBTreeNode* node, const K& lower_key, const K& upper_key) { - auto low_index = node->low * m_group_size; - auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1); - return lower_key < m_data[low_index].rec.key && m_data[high_index].rec.key < upper_key; - } - - bool intersects(struct AugBTreeNode* node, const K& lower_key, const K& upper_key) { - auto low_index = node->low * m_group_size; - auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1); - return lower_key < m_data[high_index].rec.key && m_data[low_index].rec.key < upper_key; - } - - void build_wirs_structure() { - m_group_size = std::ceil(std::log(m_reccnt)); - size_t n_groups = std::ceil((double) m_reccnt / (double) m_group_size); - - // Fat point construction + low level alias.... - double sum_weight = 0.0; - std::vector weights; - std::vector group_norm_weight; - size_t i = 0; - size_t group_no = 0; - while (i < m_reccnt) { - double group_weight = 0.0; - group_norm_weight.clear(); - for (size_t k = 0; k < m_group_size && i < m_reccnt; ++k, ++i) { - auto w = m_data[i].rec.weight; - group_norm_weight.emplace_back(w); - group_weight += w; - sum_weight += w; - } - - for (auto& w: group_norm_weight) - if (group_weight) w /= group_weight; - else w = 1.0 / group_norm_weight.size(); - m_alias.emplace_back(new Alias(group_norm_weight)); - - - weights.emplace_back(group_weight); - } - - assert(weights.size() == n_groups); - - m_root = construct_AugBTreeNode(weights, 0, n_groups-1); - } - - struct AugBTreeNode* construct_AugBTreeNode(const std::vector& weights, size_t low, size_t high) { - if (low == high) { - return new AugBTreeNode{nullptr, nullptr, low, high, weights[low], new Alias({1.0})}; - } else if (low > high) return nullptr; - - std::vector node_weights; - W sum = 0; - for (size_t i = low; i < high; ++i) { - node_weights.emplace_back(weights[i]); - sum += weights[i]; - } - - for (auto& w: node_weights) - if (sum) w /= sum; - else w = 1.0 / node_weights.size(); - - m_node_cnt += 1; - size_t mid = (low + high) / 2; - return new AugBTreeNode{construct_AugBTreeNode(weights, low, mid), - construct_AugBTreeNode(weights, mid + 1, high), - low, high, sum, new Alias(node_weights)}; - } - - void free_tree(struct AugBTreeNode* node) { - if (node) { - delete node->alias; - free_tree(node->left); - free_tree(node->right); - delete node; - } - } - - Wrapped* m_data; - std::vector m_alias; - AugBTreeNode* m_root; - size_t m_reccnt; - size_t m_tombstone_cnt; - size_t m_group_size; - size_t m_alloc_size; - size_t m_node_cnt; - BloomFilter *m_bf; -}; -} diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index 3783b38..4e51037 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -26,6 +26,8 @@ namespace de { template class FSTrie { +public: + typedef R RECORD; private: typedef decltype(R::key) K; @@ -80,7 +82,7 @@ public: delete[] temp_buffer; } - FSTrie(std::vector &shards) + FSTrie(std::vector const &shards) : m_data(nullptr) , m_reccnt(0) , m_alloc_size(0) diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index 1cca506..64c0b2b 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -1,8 +1,8 @@ /* * include/shard/ISAMTree.h * - * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie + * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Dong Xie * * Distributed under the Modified BSD License. * @@ -12,258 +12,246 @@ */ #pragma once -#include #include +#include #include "framework/ShardRequirements.h" -#include "util/bf_config.h" #include "psu-ds/BloomFilter.h" #include "util/SortedMerge.h" +#include "util/bf_config.h" -using psudb::CACHELINE_SIZE; using psudb::BloomFilter; -using psudb::PriorityQueue; -using psudb::queue_record; using psudb::byte; +using psudb::CACHELINE_SIZE; namespace de { -template -class ISAMTree { +template class ISAMTree { private: + typedef decltype(R::key) K; + typedef decltype(R::value) V; -typedef decltype(R::key) K; -typedef decltype(R::value) V; - -constexpr static size_t NODE_SZ = 256; -constexpr static size_t INTERNAL_FANOUT = NODE_SZ / (sizeof(K) + sizeof(byte*)); + constexpr static size_t NODE_SZ = 256; + constexpr static size_t INTERNAL_FANOUT = + NODE_SZ / (sizeof(K) + sizeof(byte *)); -struct InternalNode { + struct InternalNode { K keys[INTERNAL_FANOUT]; - byte* child[INTERNAL_FANOUT]; -}; - -static_assert(sizeof(InternalNode) == NODE_SZ, "node size does not match"); + byte *child[INTERNAL_FANOUT]; + }; -constexpr static size_t LEAF_FANOUT = NODE_SZ / sizeof(R); + static_assert(sizeof(InternalNode) == NODE_SZ, "node size does not match"); + constexpr static size_t LEAF_FANOUT = NODE_SZ / sizeof(R); public: - ISAMTree(BufferView buffer) - : m_bf(nullptr) - , m_isam_nodes(nullptr) - , m_root(nullptr) - , m_reccnt(0) - , m_tombstone_cnt(0) - , m_internal_node_cnt(0) - , m_deleted_cnt(0) - , m_alloc_size(0) - { - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - buffer.get_record_count() * - sizeof(Wrapped), - (byte**) &m_data); - - auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; - - if (m_reccnt > 0) { - build_internal_levels(); - } + typedef R RECORD; + + ISAMTree(BufferView buffer) + : m_bf(nullptr), m_isam_nodes(nullptr), m_root(nullptr), m_reccnt(0), + m_tombstone_cnt(0), m_internal_node_cnt(0), m_deleted_cnt(0), + m_alloc_size(0) { + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped), + (byte **)&m_data); + + auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; + + if (m_reccnt > 0) { + build_internal_levels(); + } + } + + ISAMTree(std::vector const &shards) + : m_bf(nullptr), m_isam_nodes(nullptr), m_root(nullptr), m_reccnt(0), + m_tombstone_cnt(0), m_internal_node_cnt(0), m_deleted_cnt(0), + m_alloc_size(0) { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = + build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); + + m_bf = nullptr; + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **)&m_data); + + auto res = sorted_array_merge(cursors, m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; + + if (m_reccnt > 0) { + build_internal_levels(); } + } - ISAMTree(std::vector &shards) - : m_bf(nullptr) - , m_isam_nodes(nullptr) - , m_root(nullptr) - , m_reccnt(0) - , m_tombstone_cnt(0) - , m_internal_node_cnt(0) - , m_deleted_cnt(0) - , m_alloc_size(0) - { - size_t attemp_reccnt = 0; - size_t tombstone_count = 0; - auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - - m_bf = nullptr; - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - attemp_reccnt * sizeof(Wrapped), - (byte **) &m_data); - - auto res = sorted_array_merge(cursors, m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; - - if (m_reccnt > 0) { - build_internal_levels(); - } + ~ISAMTree() { + free(m_data); + free(m_isam_nodes); + delete m_bf; + } + + Wrapped *point_lookup(const R &rec, bool filter = false) { + if (filter && !m_bf->lookup(rec)) { + return nullptr; } - ~ISAMTree() { - free(m_data); - free(m_isam_nodes); - delete m_bf; + size_t idx = get_lower_bound(rec.key); + if (idx >= m_reccnt) { + return nullptr; } - Wrapped *point_lookup(const R &rec, bool filter=false) { - if (filter && !m_bf->lookup(rec)) { - return nullptr; - } + while (idx < m_reccnt && m_data[idx].rec < rec) + ++idx; - size_t idx = get_lower_bound(rec.key); - if (idx >= m_reccnt) { - return nullptr; - } + if (m_data[idx].rec == rec) { + return m_data + idx; + } - while (idx < m_reccnt && m_data[idx].rec < rec) ++idx; + return nullptr; + } - if (m_data[idx].rec == rec) { - return m_data + idx; - } + Wrapped *get_data() const { return m_data; } - return nullptr; - } + size_t get_record_count() const { return m_reccnt; } - Wrapped* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; - } + size_t get_tombstone_count() const { return m_tombstone_cnt; } - size_t get_tombstone_count() const { - return m_tombstone_cnt; - } + size_t get_memory_usage() const { return m_internal_node_cnt * NODE_SZ; } + size_t get_aux_memory_usage() const { return (m_bf) ? m_bf->memory_usage() : 0; } - size_t get_memory_usage() { - return m_internal_node_cnt * NODE_SZ; - } + /* SortedShardInterface methods */ + size_t get_lower_bound(const K &key) const { + const InternalNode *now = m_root; + while (!is_leaf(reinterpret_cast(now))) { + const InternalNode *next = nullptr; + for (size_t i = 0; i < INTERNAL_FANOUT - 1; ++i) { + if (now->child[i + 1] == nullptr || key <= now->keys[i]) { + next = reinterpret_cast(now->child[i]); + break; + } + } - size_t get_aux_memory_usage() { - return (m_bf) ? m_bf->memory_usage() : 0; + now = next ? next + : reinterpret_cast( + now->child[INTERNAL_FANOUT - 1]); } - /* SortedShardInterface methods */ - size_t get_lower_bound(const K& key) const { - const InternalNode* now = m_root; - while (!is_leaf(reinterpret_cast(now))) { - const InternalNode* next = nullptr; - for (size_t i = 0; i < INTERNAL_FANOUT - 1; ++i) { - if (now->child[i + 1] == nullptr || key <= now->keys[i]) { - next = reinterpret_cast(now->child[i]); - break; - } - } - - now = next ? next : reinterpret_cast(now->child[INTERNAL_FANOUT - 1]); + const Wrapped *pos = reinterpret_cast *>(now); + while (pos < m_data + m_reccnt && pos->rec.key < key) + pos++; + + return pos - m_data; + } + + size_t get_upper_bound(const K &key) const { + const InternalNode *now = m_root; + while (!is_leaf(reinterpret_cast(now))) { + const InternalNode *next = nullptr; + for (size_t i = 0; i < INTERNAL_FANOUT - 1; ++i) { + if (now->child[i + 1] == nullptr || key < now->keys[i]) { + next = reinterpret_cast(now->child[i]); + break; } + } - const Wrapped* pos = reinterpret_cast*>(now); - while (pos < m_data + m_reccnt && pos->rec.key < key) pos++; - - return pos - m_data; + now = next ? next + : reinterpret_cast( + now->child[INTERNAL_FANOUT - 1]); } - size_t get_upper_bound(const K& key) const { - const InternalNode* now = m_root; - while (!is_leaf(reinterpret_cast(now))) { - const InternalNode* next = nullptr; - for (size_t i = 0; i < INTERNAL_FANOUT - 1; ++i) { - if (now->child[i + 1] == nullptr || key < now->keys[i]) { - next = reinterpret_cast(now->child[i]); - break; - } - } - - now = next ? next : reinterpret_cast(now->child[INTERNAL_FANOUT - 1]); - } - - const Wrapped* pos = reinterpret_cast*>(now); - while (pos < m_data + m_reccnt && pos->rec.key <= key) pos++; + const Wrapped *pos = reinterpret_cast *>(now); + while (pos < m_data + m_reccnt && pos->rec.key <= key) + pos++; - return pos - m_data; - } + return pos - m_data; + } - const Wrapped* get_record_at(size_t idx) const { - return (idx < m_reccnt) ? m_data + idx : nullptr; - } + const Wrapped *get_record_at(size_t idx) const { + return (idx < m_reccnt) ? m_data + idx : nullptr; + } private: - void build_internal_levels() { - size_t n_leaf_nodes = m_reccnt / LEAF_FANOUT + (m_reccnt % LEAF_FANOUT != 0); - - size_t level_node_cnt = n_leaf_nodes; - size_t node_cnt = 0; - do { - level_node_cnt = level_node_cnt / INTERNAL_FANOUT + (level_node_cnt % INTERNAL_FANOUT != 0); - node_cnt += level_node_cnt; - } while (level_node_cnt > 1); - - m_alloc_size += psudb::sf_aligned_calloc(CACHELINE_SIZE, node_cnt, NODE_SZ, (byte**) &m_isam_nodes); - m_internal_node_cnt = node_cnt; - - InternalNode* current_node = m_isam_nodes; - - const Wrapped* leaf_base = m_data; - const Wrapped* leaf_stop = m_data + m_reccnt; - while (leaf_base < leaf_stop) { - size_t fanout = 0; - for (size_t i = 0; i < INTERNAL_FANOUT; ++i) { - auto rec_ptr = leaf_base + LEAF_FANOUT * i; - if (rec_ptr >= leaf_stop) break; - const Wrapped* sep_key = std::min(rec_ptr + LEAF_FANOUT - 1, leaf_stop - 1); - current_node->keys[i] = sep_key->rec.key; - current_node->child[i] = (byte*)rec_ptr; - ++fanout; - } - current_node++; - leaf_base += fanout * LEAF_FANOUT; - } - - auto level_start = m_isam_nodes; - auto level_stop = current_node; - auto current_level_node_cnt = level_stop - level_start; - while (current_level_node_cnt > 1) { - auto now = level_start; - while (now < level_stop) { - size_t child_cnt = 0; - for (size_t i = 0; i < INTERNAL_FANOUT; ++i) { - auto node_ptr = now + i; - ++child_cnt; - if (node_ptr >= level_stop) break; - current_node->keys[i] = node_ptr->keys[INTERNAL_FANOUT - 1]; - current_node->child[i] = (byte*)node_ptr; - } - now += child_cnt; - current_node++; - } - level_start = level_stop; - level_stop = current_node; - current_level_node_cnt = level_stop - level_start; - } - - assert(current_level_node_cnt == 1); - m_root = level_start; + void build_internal_levels() { + size_t n_leaf_nodes = + m_reccnt / LEAF_FANOUT + (m_reccnt % LEAF_FANOUT != 0); + + size_t level_node_cnt = n_leaf_nodes; + size_t node_cnt = 0; + do { + level_node_cnt = level_node_cnt / INTERNAL_FANOUT + + (level_node_cnt % INTERNAL_FANOUT != 0); + node_cnt += level_node_cnt; + } while (level_node_cnt > 1); + + m_alloc_size += psudb::sf_aligned_calloc(CACHELINE_SIZE, node_cnt, NODE_SZ, + (byte **)&m_isam_nodes); + m_internal_node_cnt = node_cnt; + + InternalNode *current_node = m_isam_nodes; + + const Wrapped *leaf_base = m_data; + const Wrapped *leaf_stop = m_data + m_reccnt; + while (leaf_base < leaf_stop) { + size_t fanout = 0; + for (size_t i = 0; i < INTERNAL_FANOUT; ++i) { + auto rec_ptr = leaf_base + LEAF_FANOUT * i; + if (rec_ptr >= leaf_stop) + break; + const Wrapped *sep_key = + std::min(rec_ptr + LEAF_FANOUT - 1, leaf_stop - 1); + current_node->keys[i] = sep_key->rec.key; + current_node->child[i] = (byte *)rec_ptr; + ++fanout; + } + current_node++; + leaf_base += fanout * LEAF_FANOUT; } - bool is_leaf(const byte* ptr) const { - return ptr >= (const byte*)m_data && ptr < (const byte*)(m_data + m_reccnt); + auto level_start = m_isam_nodes; + auto level_stop = current_node; + auto current_level_node_cnt = level_stop - level_start; + while (current_level_node_cnt > 1) { + auto now = level_start; + while (now < level_stop) { + size_t child_cnt = 0; + for (size_t i = 0; i < INTERNAL_FANOUT; ++i) { + auto node_ptr = now + i; + ++child_cnt; + if (node_ptr >= level_stop) + break; + current_node->keys[i] = node_ptr->keys[INTERNAL_FANOUT - 1]; + current_node->child[i] = (byte *)node_ptr; + } + now += child_cnt; + current_node++; + } + level_start = level_stop; + level_stop = current_node; + current_level_node_cnt = level_stop - level_start; } - psudb::BloomFilter *m_bf; - InternalNode* m_isam_nodes; - InternalNode* m_root; - size_t m_reccnt; - size_t m_tombstone_cnt; - size_t m_internal_node_cnt; - size_t m_deleted_cnt; - size_t m_alloc_size; - - Wrapped* m_data; + assert(current_level_node_cnt == 1); + m_root = level_start; + } + + bool is_leaf(const byte *ptr) const { + return ptr >= (const byte *)m_data && + ptr < (const byte *)(m_data + m_reccnt); + } + + psudb::BloomFilter *m_bf; + InternalNode *m_isam_nodes; + InternalNode *m_root; + size_t m_reccnt; + size_t m_tombstone_cnt; + size_t m_internal_node_cnt; + size_t m_deleted_cnt; + size_t m_alloc_size; + + Wrapped *m_data; }; -} +} // namespace de diff --git a/include/shard/PGM.h b/include/shard/PGM.h index 509796b..7d1f492 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -33,6 +33,8 @@ namespace de { template class PGM { +public: + typedef R RECORD; private: typedef decltype(R::key) K; typedef decltype(R::value) V; @@ -109,7 +111,7 @@ public: } } - PGM(std::vector shards) + PGM(std::vector const &shards) : m_data(nullptr) , m_bf(nullptr) , m_reccnt(0) diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 581277e..9d8c3bb 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -30,6 +30,8 @@ namespace de { template class TrieSpline { +public: + typedef R RECORD; private: typedef decltype(R::key) K; typedef decltype(R::value) V; @@ -122,7 +124,7 @@ public: } } - TrieSpline(std::vector &shards) + TrieSpline(std::vector const &shards) : m_reccnt(0) , m_tombstone_cnt(0) , m_alloc_size(0) diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h index d5a2393..477db5c 100644 --- a/include/shard/VPTree.h +++ b/include/shard/VPTree.h @@ -21,13 +21,15 @@ using psudb::CACHELINE_SIZE; using psudb::PriorityQueue; -using psudb::queue_record; using psudb::byte; namespace de { template class VPTree { +public: + typedef R RECORD; + private: struct vpnode { size_t start; @@ -50,7 +52,7 @@ private: public: VPTree(BufferView buffer) - : m_reccnt(0), m_tombstone_cnt(0), m_root(nullptr), m_node_cnt(0) { + : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) { m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, @@ -59,8 +61,6 @@ public: (byte**) &m_data); m_ptrs = new vp_ptr[buffer.get_record_count()]; - - size_t offset = 0; m_reccnt = 0; // FIXME: will eventually need to figure out tombstones @@ -87,7 +87,7 @@ public: } VPTree(std::vector shards) - : m_reccnt(0), m_tombstone_cnt(0), m_root(nullptr), m_node_cnt(0) { + : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) { size_t attemp_reccnt = 0; for (size_t i=0; irec; pq.pop(); } pq.push(m_ptrs[node->start].ptr); diff --git a/include/util/Cursor.h b/include/util/Cursor.h index e8ba53d..e7963b1 100644 --- a/include/util/Cursor.h +++ b/include/util/Cursor.h @@ -1,8 +1,8 @@ /* * include/util/Cursor.h * - * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie + * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Dong Xie * * Distributed under the Modified BSD License. * @@ -21,16 +21,15 @@ #include namespace de { -template -struct Cursor { - R *ptr; - R *end; - size_t cur_rec_idx; - size_t rec_cnt; +template struct Cursor { + const R *ptr; + const R *end; + size_t cur_rec_idx; + size_t rec_cnt; - friend bool operator==(const Cursor &a, const Cursor &b) { - return a.ptr == b.ptr && a.end == b.end; - } + friend bool operator==(const Cursor &a, const Cursor &b) { + return a.ptr == b.ptr && a.end == b.end; + } }; /* @@ -43,51 +42,55 @@ struct Cursor { * be updated to be equal to end, and false will be returned. Iterators will * not be closed. */ -template -inline static bool advance_cursor(Cursor &cur) { - cur.ptr++; - cur.cur_rec_idx++; +template inline static bool advance_cursor(Cursor &cur) { + cur.ptr++; + cur.cur_rec_idx++; - if (cur.cur_rec_idx >= cur.rec_cnt) return false; + if (cur.cur_rec_idx >= cur.rec_cnt) + return false; - if (cur.ptr >= cur.end) { - return false; - } - return true; + if (cur.ptr >= cur.end) { + return false; + } + return true; } /* * Process the list of cursors to return the cursor containing the next * largest element. Does not advance any of the cursors. If current is - * specified, then skip the current head of that cursor during checking. - * This allows for "peaking" at the next largest element after the current + * specified, then skip the current head of that cursor during checking. + * This allows for "peaking" at the next largest element after the current * largest is processed. */ template -inline static Cursor *get_next(std::vector> &cursors, Cursor *current=nullptr) { - const R *min_rec = nullptr; - Cursor *result = nullptr; - // FIXME: for large cursor vectors, it may be worth it to use a - // PriorityQueue here instead of scanning. - for (size_t i=0; i< cursors.size(); i++) { - if (cursors[i] == (Cursor) {0} ) continue; - - const R *rec = (&cursors[i] == current) ? cursors[i].ptr + 1 : cursors[i].ptr; - if (rec >= cursors[i].end) continue; +inline static Cursor *get_next(std::vector> &cursors, + Cursor *current = nullptr) { + const R *min_rec = nullptr; + Cursor *result = nullptr; + // FIXME: for large cursor vectors, it may be worth it to use a + // PriorityQueue here instead of scanning. + for (size_t i = 0; i < cursors.size(); i++) { + if (cursors[i] == (Cursor){0}) + continue; - if (min_rec == nullptr) { - result = &cursors[i]; - min_rec = rec; - continue; - } + const R *rec = + (&cursors[i] == current) ? cursors[i].ptr + 1 : cursors[i].ptr; + if (rec >= cursors[i].end) + continue; - if (*rec < *min_rec) { - result = &cursors[i]; - min_rec = rec; - } + if (min_rec == nullptr) { + result = &cursors[i]; + min_rec = rec; + continue; } - return result; -} + if (*rec < *min_rec) { + result = &cursors[i]; + min_rec = rec; + } + } + return result; } + +} // namespace de diff --git a/include/util/SortedMerge.h b/include/util/SortedMerge.h index c149189..b0a3215 100644 --- a/include/util/SortedMerge.h +++ b/include/util/SortedMerge.h @@ -1,72 +1,78 @@ /* * include/util/SortedMerge.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * * A sorted array merge routine for use in Shard construction, as many - * shards will use a sorted array to represent their data. Also encapsulates + * shards will use a sorted array to represent their data. Also encapsulates * the necessary tombstone-cancellation logic. * - * FIXME: include generic per-record processing functionality for Shards that + * FIXME: include generic per-record processing functionality for Shards that * need it, to avoid needing to reprocess the array in the shard after * creation. */ #pragma once -#include "util/Cursor.h" +#include + #include "framework/interface/Shard.h" #include "psu-ds/PriorityQueue.h" +#include "util/Cursor.h" namespace de { -using psudb::PriorityQueue; using psudb::BloomFilter; -using psudb::queue_record; using psudb::byte; using psudb::CACHELINE_SIZE; +using psudb::PriorityQueue; +using psudb::queue_record; /* - * A simple struct to return record_count and tombstone_count information - * back to the caller. Could've been an std::pair, but I like the more + * A simple struct to return record_count and tombstone_count information + * back to the caller. Could've been an std::pair, but I like the more * explicit names. */ struct merge_info { - size_t record_count; - size_t tombstone_count; + size_t record_count; + size_t tombstone_count; }; /* * Build a vector of cursors corresponding to the records contained within * a vector of shards. The cursor at index i in the output will correspond - * to the shard at index i in the input. + * to the shard at index i in the input. * * The values of reccnt and tscnt will be updated with the sum of the * records contained within the shards. Note that these counts include deleted * records that may be removed during shard construction, and so constitute * upper bounds only. */ -template S> -static std::vector>> build_cursor_vec(std::vector &shards, size_t *reccnt, size_t *tscnt) { - std::vector>> cursors; - cursors.reserve(shards.size()); - - *reccnt = 0; - *tscnt = 0; - - for (size_t i = 0; i < shards.size(); ++i) { - if (shards[i]) { - auto base = shards[i]->get_data(); - cursors.emplace_back(Cursor>{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); - *reccnt += shards[i]->get_record_count(); - *tscnt += shards[i]->get_tombstone_count(); - } else { - cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); - } +template +static std::vector>> +build_cursor_vec(std::vector const &shards, size_t *reccnt, + size_t *tscnt) { + std::vector>> cursors; + cursors.reserve(shards.size()); + + *reccnt = 0; + *tscnt = 0; + + for (size_t i = 0; i < shards.size(); ++i) { + if (shards[i]) { + auto base = shards[i]->get_data(); + cursors.emplace_back( + Cursor>{base, base + shards[i]->get_record_count(), 0, + shards[i]->get_record_count()}); + *reccnt += shards[i]->get_record_count(); + *tscnt += shards[i]->get_tombstone_count(); + } else { + cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); } + } - return cursors; + return cursors; } /* @@ -80,126 +86,128 @@ static std::vector>> build_cursor_vec(std::vector &shards, * program will be aborted if the allocation fails. */ template -static merge_info sorted_array_from_bufferview(BufferView bv, - Wrapped *buffer, - psudb::BloomFilter *bf=nullptr) { - /* - * Copy the contents of the buffer view into a temporary buffer, and - * sort them. We still need to iterate over these temporary records to - * apply tombstone/deleted record filtering, as well as any possible - * per-record processing that is required by the shard being built. - */ - auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, - bv.get_record_count(), - sizeof(Wrapped)); - bv.copy_to_buffer((byte *) temp_buffer); - - auto base = temp_buffer; - auto stop = base + bv.get_record_count(); - std::sort(base, stop, std::less>()); - - merge_info info = {0, 0}; - - /* - * Iterate over the temporary buffer to process the records, copying - * them into buffer as needed - */ - while (base < stop) { - if (!base->is_tombstone() && (base + 1 < stop) - && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - continue; - } else if (base->is_deleted()) { - base += 1; - continue; - } - - // FIXME: this shouldn't be necessary, but the tagged record - // bypass doesn't seem to be working on this code-path, so this - // ensures that tagged records from the buffer are able to be - // dropped, eventually. It should only need to be &= 1 - base->header &= 3; - buffer[info.record_count++] = *base; - - if (base->is_tombstone()) { - info.tombstone_count++; - if (bf){ - bf->insert(base->rec); - } - } +static merge_info +sorted_array_from_bufferview(BufferView bv, Wrapped *buffer, + psudb::BloomFilter *bf = nullptr) { + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped *)psudb::sf_aligned_calloc( + CACHELINE_SIZE, bv.get_record_count(), sizeof(Wrapped)); + bv.copy_to_buffer((byte *)temp_buffer); + + auto base = temp_buffer; + auto stop = base + bv.get_record_count(); + std::sort(base, stop, std::less>()); + + merge_info info = {0, 0}; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) && + base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; + } - base++; + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + buffer[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (bf) { + bf->insert(base->rec); + } } - free(temp_buffer); - return info; + base++; + } + + free(temp_buffer); + return info; } /* * Perform a sorted merge of the records within cursors into the provided * buffer. Includes tombstone and tagged delete cancellation logic, and - * will insert tombstones into a bloom filter, if one is provided. + * will insert tombstones into a bloom filter, if one is provided. * * The behavior of this function is undefined if the provided buffer does * not have space to contain all of the records within the input cursors. */ template -static merge_info sorted_array_merge(std::vector>> &cursors, - Wrapped *buffer, - psudb::BloomFilter *bf=nullptr) { - - // FIXME: For smaller cursor arrays, it may be more efficient to skip - // the priority queue and just do a scan. - PriorityQueue> pq(cursors.size()); - for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; - /* - * if the current record is not a tombstone, and the next record is - * a tombstone that matches the current one, then the current one - * has been deleted, and both it and its tombstone can be skipped - * over. +static merge_info sorted_array_merge(std::vector>> &cursors, + Wrapped *buffer, + psudb::BloomFilter *bf = nullptr) { + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i = 0; i < cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } + + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = + pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[now.version]; + auto &cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + buffer[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. */ - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - /* skip over records that have been deleted via tagging */ - if (!cursor.ptr->is_deleted()) { - buffer[info.record_count++] = *cursor.ptr; - - /* - * if the record is a tombstone, increment the ts count and - * insert it into the bloom filter if one has been - * provided. - */ - if (cursor.ptr->is_tombstone()) { - info.tombstone_count++; - if (bf) { - bf->insert(cursor.ptr->rec); - } - } - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (bf) { + bf->insert(cursor.ptr->rec); + } } + } + pq.pop(); + + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); } + } - return info; + return info; } - - -} +} // namespace de diff --git a/include/util/bf_config.h b/include/util/bf_config.h index 9f29ed7..836e452 100644 --- a/include/util/bf_config.h +++ b/include/util/bf_config.h @@ -1,8 +1,8 @@ /* * include/util/bf_config.h * - * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie + * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Dong Xie * * Distributed under the Modified BSD License. * @@ -26,19 +26,15 @@ static double BF_FPR = .01; static size_t BF_HASH_FUNCS = 7; /* - * Adjust the value of BF_FPR. The argument must be on the interval + * Adjust the value of BF_FPR. The argument must be on the interval * (0, 1), or the behavior of bloom filters is undefined. */ -static void BF_SET_FPR(double fpr) { - BF_FPR = fpr; -} +[[maybe_unused]] static void BF_SET_FPR(double fpr) { BF_FPR = fpr; } /* * Adjust the value of BF_HASH_FUNCS. The argument must be on the interval * (0, INT64_MAX], or the behavior of bloom filters is undefined. */ -static void BF_SET_HASHFUNC(size_t func_cnt) { - BF_HASH_FUNCS = func_cnt; -} +[[maybe_unused]] static void BF_SET_HASHFUNC(size_t func_cnt) { BF_HASH_FUNCS = func_cnt; } -} +} // namespace de diff --git a/include/util/types.h b/include/util/types.h index cf61412..b8a1343 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -1,7 +1,7 @@ /* * include/util/types.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023-2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -17,10 +17,10 @@ */ #pragma once +#include #include #include #include -#include namespace de { @@ -30,14 +30,14 @@ typedef uint32_t PageNum; /* * Byte offset within a page. Also used for lengths of records, etc., * within the codebase. size_t isn't necessary, as the maximum offset - * is only parm::PAGE_SIZE + * is only parm::PAGE_SIZE */ typedef uint16_t PageOffset; /* A unique identifier for a frame within a buffer or cache */ typedef int32_t FrameId; -/* +/* * A unique timestamp for use in MVCC concurrency control. Currently stored in * record headers, but not used by anything. */ @@ -45,7 +45,7 @@ typedef uint32_t Timestamp; const Timestamp TIMESTAMP_MIN = 0; const Timestamp TIMESTAMP_MAX = UINT32_MAX; -/* +/* * Invalid values for various IDs. Used throughout the code base to indicate * uninitialized values and error conditions. */ @@ -60,90 +60,85 @@ const FrameId INVALID_FRID = -1; * as a contiguous index space. */ struct ShardID { - ssize_t level_idx; - ssize_t shard_idx; + ssize_t level_idx; + ssize_t shard_idx; - friend bool operator==(const ShardID &shid1, const ShardID &shid2) { - return shid1.level_idx == shid2.level_idx && shid1.shard_idx == shid2.shard_idx; - } + friend bool operator==(const ShardID &shid1, const ShardID &shid2) { + return shid1.level_idx == shid2.level_idx && + shid1.shard_idx == shid2.shard_idx; + } }; -/* A placeholder for an invalid shard--also used to indicate the mutable buffer */ +/* + * A placeholder for an invalid shard--also used to indicate the + * mutable buffer + */ const ShardID INVALID_SHID = {-1, -1}; typedef ssize_t level_index; typedef struct ReconstructionTask { - std::vector sources; - level_index target; - size_t reccnt; + std::vector sources; + level_index target; + size_t reccnt; - void add_source(level_index source, size_t cnt) { - sources.push_back(source); - reccnt += cnt; - } + void add_source(level_index source, size_t cnt) { + sources.push_back(source); + reccnt += cnt; + } } ReconstructionTask; class ReconstructionVector { public: - ReconstructionVector() - : total_reccnt(0) {} + ReconstructionVector() : total_reccnt(0) {} - ~ReconstructionVector() = default; + ~ReconstructionVector() = default; - ReconstructionTask operator[](size_t idx) { - return m_tasks[idx]; - } + ReconstructionTask operator[](size_t idx) { return m_tasks[idx]; } - void add_reconstruction(level_index source, level_index target, size_t reccnt) { - m_tasks.push_back({{source}, target, reccnt}); - total_reccnt += reccnt; - } + void add_reconstruction(level_index source, level_index target, + size_t reccnt) { + m_tasks.push_back({{source}, target, reccnt}); + total_reccnt += reccnt; + } - void add_reconstruction(ReconstructionTask task) { - m_tasks.push_back(task); - } + void add_reconstruction(ReconstructionTask task) { m_tasks.push_back(task); } - ReconstructionTask remove_reconstruction(size_t idx) { - assert(idx < m_tasks.size()); - auto task = m_tasks[idx]; + ReconstructionTask remove_reconstruction(size_t idx) { + assert(idx < m_tasks.size()); + auto task = m_tasks[idx]; - m_tasks.erase(m_tasks.begin() + idx); - total_reccnt -= task.reccnt; + m_tasks.erase(m_tasks.begin() + idx); + total_reccnt -= task.reccnt; - return task; - } + return task; + } - ReconstructionTask remove_smallest_reconstruction() { - size_t min_size = m_tasks[0].reccnt; - size_t idx = 0; - for (size_t i=1; i m_tasks; - size_t total_reccnt; + std::vector m_tasks; + size_t total_reccnt; }; -} +} // namespace de -- cgit v1.2.3