diff options
Diffstat (limited to 'include/framework')
21 files changed, 2571 insertions, 830 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 524024b..7ea5370 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -1,302 +1,639 @@ /* * include/framework/DynamicExtension.h * - * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> * Dong Xie <dongx@psu.edu> * - * All rights reserved. Published under the Modified BSD License. + * Distributed under the Modified BSD License. * */ #pragma once #include <atomic> -#include <numeric> #include <cstdio> #include <vector> -#include "framework/MutableBuffer.h" -#include "framework/InternalLevel.h" -#include "framework/ShardInterface.h" -#include "framework/QueryInterface.h" -#include "framework/RecordInterface.h" +#include "framework/interface/Scheduler.h" +#include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/SerialScheduler.h" -#include "shard/WIRS.h" -#include "psu-util/timer.h" -#include "psu-ds/Alias.h" +#include "framework/structure/MutableBuffer.h" +#include "framework/interface/Record.h" +#include "framework/structure/ExtensionStructure.h" -namespace de { - -thread_local size_t sampling_attempts = 0; -thread_local size_t sampling_rejections = 0; -thread_local size_t deletion_rejections = 0; -thread_local size_t bounds_rejections = 0; -thread_local size_t tombstone_rejections = 0; -thread_local size_t buffer_rejections = 0; - -/* - * thread_local size_t various_sampling_times go here. - */ -thread_local size_t sample_range_time = 0; -thread_local size_t alias_time = 0; -thread_local size_t alias_query_time = 0; -thread_local size_t rejection_check_time = 0; -thread_local size_t buffer_sample_time = 0; -thread_local size_t memlevel_sample_time = 0; -thread_local size_t disklevel_sample_time = 0; -thread_local size_t sampling_bailouts = 0; - - -enum class LayoutPolicy { - LEVELING, - TEIRING -}; - -enum class DeletePolicy { - TOMBSTONE, - TAGGING -}; +#include "framework/util/Configuration.h" +#include "framework/scheduling/Epoch.h" -typedef ssize_t level_index; +namespace de { -template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING> +template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L=LayoutPolicy::TEIRING, + DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=FIFOScheduler> class DynamicExtension { - //typedef typename S<R> Shard; typedef S Shard; typedef MutableBuffer<R> Buffer; + typedef ExtensionStructure<R, S, Q, L> Structure; + typedef Epoch<R, S, Q, L> _Epoch; + typedef BufferView<R> BufView; + + static constexpr size_t QUERY = 1; + static constexpr size_t RECONSTRUCTION = 2; + + struct epoch_ptr { + _Epoch *epoch; + size_t refcnt; + }; public: - DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) - : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop), - m_buffer(new Buffer(buffer_cap, buffer_cap * max_delete_prop)) - { } + DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0, + size_t thread_cnt=16) + : m_scale_factor(scale_factor) + , m_max_delete_prop(1) + , m_sched(memory_budget, thread_cnt) + , m_buffer(new Buffer(buffer_lwm, buffer_hwm)) + , m_core_cnt(thread_cnt) + , m_next_core(0) + , m_epoch_cnt(0) + { + auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); + m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); + m_previous_epoch.store({nullptr, 0}); + m_next_epoch.store({nullptr, 0}); + } ~DynamicExtension() { - delete m_buffer; - for (size_t i=0; i<m_levels.size(); i++) { - delete m_levels[i]; - } + /* let any in-flight epoch transition finish */ + await_next_epoch(); + + /* shutdown the scheduler */ + m_sched.shutdown(); + + /* delete all held resources */ + delete m_next_epoch.load().epoch; + delete m_current_epoch.load().epoch; + delete m_previous_epoch.load().epoch; + + delete m_buffer; } + /* + * Insert the record `rec` into the index. If the buffer is full and + * the framework is blocking on an epoch transition, this call may fail + * and return 0. In this case, retry the call again later. If + * successful, 1 will be returned. The record will be immediately + * visible in the buffer upon the successful return of this function. + */ int insert(const R &rec) { return internal_append(rec, false); } + /* + * Erase the record `rec` from the index. It is assumed that `rec` + * currently exists--no special checks are made for correctness here. + * The behavior if this function will differ depending on if tombstone + * or tagged deletes are used. + * + * Tombstone deletes - inserts a tombstone record for `rec`. This *may* + * return 0 and fail if the buffer is full and the framework is + * blocking on an epoch transition. In this case, repeat the call + * later. 1 will be returned when the tombstone is successfully + * inserted. + * + * Tagging deletes - Does a point lookup for the record across the + * entire structure, and sets its delete bit when found. Returns 1 if + * the record is found and marked, and 0 if it was not (i.e., if it + * isn't present in the index). + */ int erase(const R &rec) { - Buffer *buffer; - + // FIXME: delete tagging will require a lot of extra work to get + // operating "correctly" in a concurrent environment. + + /* + * Get a view on the buffer *first*. This will ensure a stronger + * ordering than simply accessing the buffer directly, but is + * not *strictly* necessary. + */ if constexpr (D == DeletePolicy::TAGGING) { - auto buffer = get_buffer(); + static_assert(std::same_as<SCHED, SerialScheduler>, "Tagging is only supported in single-threaded operation"); - // Check the levels first. This assumes there aren't - // any undeleted duplicate records. - for (auto level : m_levels) { - if (level && level->delete_record(rec)) { - return 1; - } + auto view = m_buffer->get_buffer_view(); + + auto epoch = get_active_epoch(); + if (epoch->get_structure()->tagged_delete(rec)) { + end_job(epoch); + return 1; } - // the buffer will take the longest amount of time, and - // probably has the lowest probability of having the record, - // so we'll check it last. - return buffer->delete_record(rec); + end_job(epoch); + + /* + * the buffer will take the longest amount of time, and + * probably has the lowest probability of having the record, + * so we'll check it last. + */ + return view.delete_record(rec); } + /* + * If tagging isn't used, then delete using a tombstone + */ return internal_append(rec, true); } - std::vector<R> query(void *parms) { - auto buffer = get_buffer(); + /* + * Execute the query with parameters `parms` and return a future. This + * future can be used to access a vector containing the results of the + * query. + * + * The behavior of this function is undefined if `parms` is not a + * pointer to a valid query parameter object for the query type used as + * a template parameter to construct the framework. + */ + std::future<std::vector<R>> query(void *parms) { + return schedule_query(parms); + } + + /* + * Returns the number of records (included tagged records and + * tombstones) currently within the framework. + */ + size_t get_record_count() { + auto epoch = get_active_epoch(); + auto t = epoch->get_buffer().get_record_count() + epoch->get_structure()->get_record_count(); + end_job(epoch); + + return t; + } + + /* + * Returns the number of tombstone records currently within the + * framework. This function can be called when tagged deletes are used, + * but will always return 0 in that case. + */ + size_t get_tombstone_count() { + auto epoch = get_active_epoch(); + auto t = epoch->get_buffer().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); + end_job(epoch); - // Get the buffer query state - auto buffer_state = Q::get_buffer_query_state(buffer, parms); + return t; + } - // Get the shard query states - std::vector<std::pair<ShardID, Shard*>> shards; - std::vector<void*> states; + /* + * Get the number of levels within the framework. This count will + * include any empty levels, but will not include the buffer. Note that + * this is *not* the same as the number of shards when tiering is used, + * as each level can contain multiple shards in that case. + */ + size_t get_height() { + auto epoch = get_active_epoch(); + auto t = epoch->get_structure()->get_height(); + end_job(epoch); - for (auto &level : m_levels) { - level->get_query_states(shards, states, parms); - } + return t; + } - Q::process_query_states(parms, states, buffer_state); + /* + * Get the number of bytes of memory allocated across the framework for + * storing records and associated index information (i.e., internal + * ISAM tree nodes). This includes memory that is allocated but + * currently unused in the buffer, or in shards themselves + * (overallocation due to delete cancellation, etc.). + */ + size_t get_memory_usage() { + auto epoch = get_active_epoch(); + auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage(); + end_job(epoch); - std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); + return t; + } - // Execute the query for the buffer - auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); - query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer)); - if constexpr (Q::EARLY_ABORT) { - if (query_results[0].size() > 0) { - auto result = Q::merge(query_results, parms); - for (size_t i=0; i<states.size(); i++) { - Q::delete_query_state(states[i]); - } + /* + * Get the number of bytes of memory allocated across the framework for + * auxiliary structures. This can include bloom filters, aux + * hashtables, etc. + */ + size_t get_aux_memory_usage() { + auto epoch = get_active_epoch(); + auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); + end_job(epoch); - Q::delete_buffer_query_state(buffer_state); - return result; - } + return t; + } + + /* + * Returns the maximum physical capacity of the buffer, measured in + * records. + */ + size_t get_buffer_capacity() { + return m_buffer->get_capacity(); + } + + /* + * Create a new single Shard object containing all of the records + * within the framework (buffer and shards). The optional parameter can + * be used to specify whether the Shard should be constructed with the + * currently active state of the framework (false), or if shard + * construction should wait until any ongoing reconstructions have + * finished and use that new version (true). + */ + Shard *create_static_structure(bool await_reconstruction_completion=false) { + if (await_reconstruction_completion) { + await_next_epoch(); } - // Execute the query for each shard - for (size_t i=0; i<shards.size(); i++) { - auto shard_results = Q::query(shards[i].second, states[i], parms); - query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer)); - if constexpr (Q::EARLY_ABORT) { - if (query_results[i].size() > 0) { - auto result = Q::merge(query_results, parms); - for (size_t i=0; i<states.size(); i++) { - Q::delete_query_state(states[i]); - } + auto epoch = get_active_epoch(); + auto vers = epoch->get_structure(); + std::vector<Shard *> shards; - Q::delete_buffer_query_state(buffer_state); - return result; + if (vers->get_levels().size() > 0) { + for (int i=vers->get_levels().size() - 1; i>= 0; i--) { + if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) { + shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); } } } - - // Merge the results together - auto result = Q::merge(query_results, parms); - for (size_t i=0; i<states.size(); i++) { - Q::delete_query_state(states[i]); + /* + * construct a shard from the buffer view. We'll hold the view + * for as short a time as possible: once the records are exfiltrated + * from the buffer, there's no reason to retain a hold on the view's + * head pointer any longer + */ + { + auto bv = epoch->get_buffer(); + if (bv.get_record_count() > 0) { + shards.emplace_back(new S(std::move(bv))); + } } - Q::delete_buffer_query_state(buffer_state); + Shard *flattened = new S(shards); - return result; - } + for (auto shard : shards) { + delete shard; + } - size_t get_record_count() { - size_t cnt = get_buffer()->get_record_count(); + end_job(epoch); + return flattened; + } - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) cnt += m_levels[i]->get_record_count(); + /* + * If the current epoch is *not* the newest one, then wait for + * the newest one to become available. Otherwise, returns immediately. + */ + void await_next_epoch() { + while (m_next_epoch.load().epoch != nullptr) { + std::unique_lock<std::mutex> lk(m_epoch_cv_lk); + m_epoch_cv.wait(lk); } + } - return cnt; + /* + * Mostly exposed for unit-testing purposes. Verifies that the current + * active version of the ExtensionStructure doesn't violate the maximum + * tombstone proportion invariant. + */ + bool validate_tombstone_proportion() { + auto epoch = get_active_epoch(); + auto t = epoch->get_structure()->validate_tombstone_proportion(); + end_job(epoch); + return t; } - size_t get_tombstone_cnt() { - size_t cnt = get_buffer()->get_tombstone_count(); - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count(); - } + void print_scheduler_statistics() { + m_sched.print_statistics(); + } + +private: + SCHED m_sched; - return cnt; + Buffer *m_buffer; + + //std::mutex m_struct_lock; + //std::set<Structure *> m_versions; + + alignas(64) std::atomic<bool> m_reconstruction_scheduled; + + std::atomic<epoch_ptr> m_next_epoch; + std::atomic<epoch_ptr> m_current_epoch; + std::atomic<epoch_ptr> m_previous_epoch; + + std::condition_variable m_epoch_cv; + std::mutex m_epoch_cv_lk; + + std::atomic<size_t> m_epoch_cnt; + + size_t m_scale_factor; + double m_max_delete_prop; + + std::atomic<int> m_next_core; + size_t m_core_cnt; + + void enforce_delete_invariant(_Epoch *epoch) { + auto structure = epoch->get_structure(); + auto compactions = structure->get_compaction_tasks(); + + while (compactions.size() > 0) { + + /* schedule a compaction */ + ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>(); + args->epoch = epoch; + args->merges = compactions; + args->extension = this; + args->compaction = true; + /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ + + auto wait = args->result.get_future(); + + m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); + + /* wait for compaction completion */ + wait.get(); + + /* get a new batch of compactions to perform, if needed */ + compactions = structure->get_compaction_tasks(); + } } - size_t get_height() { - return m_levels.size(); + _Epoch *get_active_epoch() { + epoch_ptr old, new_ptr; + + do { + /* + * during an epoch transition, a nullptr will installed in the + * current_epoch. At this moment, the "new" current epoch will + * soon be installed, but the "current" current epoch has been + * moved back to m_previous_epoch. + */ + if (m_current_epoch.load().epoch == nullptr) { + old = m_previous_epoch; + new_ptr = {old.epoch, old.refcnt+1}; + if (old.epoch != nullptr && m_previous_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } else { + old = m_current_epoch; + new_ptr = {old.epoch, old.refcnt+1}; + if (old.epoch != nullptr && m_current_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } + } while (true); + + assert(new_ptr.refcnt > 0); + + return new_ptr.epoch; } - size_t get_memory_usage() { - size_t cnt = m_buffer->get_memory_usage(); + void advance_epoch(size_t buffer_head) { + + retire_epoch(m_previous_epoch.load().epoch); + + epoch_ptr tmp = {nullptr, 0}; + epoch_ptr cur; + do { + cur = m_current_epoch; + } while(!m_current_epoch.compare_exchange_strong(cur, tmp)); + + m_previous_epoch.store(cur); - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) cnt += m_levels[i]->get_memory_usage(); + // FIXME: this may currently block because there isn't any + // query preemption yet. At this point, we'd need to either + // 1) wait for all queries on the old_head to finish + // 2) kill all queries on the old_head + // 3) somehow migrate all queries on the old_head to the new + // version + while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) { + _mm_pause(); } - return cnt; + + m_current_epoch.store(m_next_epoch); + m_next_epoch.store({nullptr, 0}); + + + /* notify any blocking threads that the new epoch is available */ + m_epoch_cv_lk.lock(); + m_epoch_cv.notify_all(); + m_epoch_cv_lk.unlock(); } - size_t get_aux_memory_usage() { - size_t cnt = m_buffer->get_aux_memory_usage(); + /* + * Creates a new epoch by copying the currently active one. The new epoch's + * structure will be a shallow copy of the old one's. + */ + _Epoch *create_new_epoch() { + /* + * This epoch access is _not_ protected under the assumption that + * only one reconstruction will be able to trigger at a time. If that condition + * is violated, it is possible that this code will clone a retired + * epoch. + */ + assert(m_next_epoch.load().epoch == nullptr); + auto current_epoch = get_active_epoch(); + + m_epoch_cnt.fetch_add(1); + m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0}); + + end_job(current_epoch); + + return m_next_epoch.load().epoch; + } - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) { - cnt += m_levels[i]->get_aux_memory_usage(); - } + void retire_epoch(_Epoch *epoch) { + /* + * Epochs with currently active jobs cannot + * be retired. By the time retire_epoch is called, + * it is assumed that a new epoch is active, meaning + * that the epoch to be retired should no longer + * accumulate new active jobs. Eventually, this + * number will hit zero and the function will + * proceed. + */ + + if (epoch == nullptr) { + return; } - return cnt; - } + epoch_ptr old, new_ptr; + new_ptr = {nullptr, 0}; + do { + old = m_previous_epoch.load(); + + /* + * If running in single threaded mode, the failure to retire + * an Epoch will result in the thread of execution blocking + * indefinitely. + */ + if constexpr (std::same_as<SCHED, SerialScheduler>) { + if (old.epoch == epoch) assert(old.refcnt == 0); + } - bool validate_tombstone_proportion() { - long double ts_prop; - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) { - ts_prop = (long double) m_levels[i]->get_tombstone_count() / (long double) calc_level_record_capacity(i); - if (ts_prop > (long double) m_max_delete_prop) { - return false; - } + if (old.epoch == epoch && old.refcnt == 0 && + m_previous_epoch.compare_exchange_strong(old, new_ptr)) { + break; } - } + usleep(1); + + } while(true); - return true; + delete epoch; } - size_t get_buffer_capacity() { - return m_buffer->get_capacity(); + static void reconstruction(void *arguments) { + auto args = (ReconstructionArgs<R, S, Q, L> *) arguments; + + ((DynamicExtension *) args->extension)->SetThreadAffinity(); + Structure *vers = args->epoch->get_structure(); + + for (ssize_t i=0; i<args->merges.size(); i++) { + vers->reconstruction(args->merges[i].second, args->merges[i].first); + } + + /* + * we'll grab the buffer AFTER doing the internal reconstruction, so we + * can flush as many records as possible in one go. The reconstruction + * was done so as to make room for the full buffer anyway, so there's + * no real benefit to doing this first. + */ + auto buffer_view = args->epoch->get_buffer(); + size_t new_head = buffer_view.get_tail(); + + /* + * if performing a compaction, don't flush the buffer, as + * there is no guarantee that any necessary reconstructions + * will free sufficient space in L0 to support a flush + */ + if (!args->compaction) { + vers->flush_buffer(std::move(buffer_view)); + } + + args->result.set_value(true); + + /* + * Compactions occur on an epoch _before_ it becomes active, + * and as a result the active epoch should _not_ be advanced as + * part of a compaction + */ + if (!args->compaction) { + ((DynamicExtension *) args->extension)->advance_epoch(new_head); + } + + ((DynamicExtension *) args->extension)->m_reconstruction_scheduled.store(false); + + delete args; } - Shard *create_static_structure() { - std::vector<Shard *> shards; + static void async_query(void *arguments) { + QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments; - if (m_levels.size() > 0) { - for (int i=m_levels.size() - 1; i>= 0; i--) { - if (m_levels[i]) { - shards.emplace_back(m_levels[i]->get_merged_shard()); - } - } - } + auto epoch = ((DynamicExtension *) args->extension)->get_active_epoch(); + + auto ptr1 = ((DynamicExtension *) args->extension)->m_previous_epoch.load().epoch; + auto ptr2 = ((DynamicExtension *) args->extension)->m_current_epoch.load().epoch; + auto ptr3 = ((DynamicExtension *) args->extension)->m_next_epoch.load().epoch; + + + auto buffer = epoch->get_buffer(); + auto vers = epoch->get_structure(); + void *parms = args->query_parms; + + /* Get the buffer query states */ + void *buffer_state = Q::get_buffer_query_state(&buffer, parms); + + /* Get the shard query states */ + std::vector<std::pair<ShardID, Shard*>> shards; + std::vector<void *> states = vers->get_query_states(shards, parms); + + Q::process_query_states(parms, states, buffer_state); - shards.emplace_back(new S(get_buffer())); + std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); + for (size_t i=0; i<query_results.size(); i++) { + std::vector<Wrapped<R>> local_results; + ShardID shid; + + if (i == 0) { /* process the buffer first */ + local_results = Q::buffer_query(buffer_state, parms); + shid = INVALID_SHID; + } else { + local_results = Q::query(shards[i - 1].second, states[i - 1], parms); + shid = shards[i - 1].first; + } - Shard *shards_array[shards.size()]; + query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); - size_t j = 0; - for (size_t i=0; i<shards.size(); i++) { - if (shards[i]) { - shards_array[j++] = shards[i]; + if constexpr (Q::EARLY_ABORT) { + if (query_results[i].size() > 0) break; } } - Shard *flattened = new S(shards_array, j); + auto result = Q::merge(query_results, parms); + args->result_set.set_value(std::move(result)); - for (auto shard : shards) { - delete shard; + ((DynamicExtension *) args->extension)->end_job(epoch); + + Q::delete_buffer_query_state(buffer_state); + for (size_t i=0; i<states.size(); i++) { + Q::delete_query_state(states[i]); } - return flattened; + delete args; } -private: - Buffer *m_buffer; + void schedule_reconstruction() { + auto epoch = create_new_epoch(); + /* + * the reconstruction process calls end_job(), + * so we must start one before calling it + */ + + ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>(); + args->epoch = epoch; + args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_high_watermark()); + args->extension = this; + args->compaction = false; + /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ + + m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); + } - size_t m_scale_factor; - double m_max_delete_prop; + std::future<std::vector<R>> schedule_query(void *query_parms) { + QueryArgs<R, S, Q, L> *args = new QueryArgs<R, S, Q, L>(); + args->extension = this; + args->query_parms = query_parms; + auto result = args->result_set.get_future(); - std::vector<InternalLevel<R, S, Q> *> m_levels; + m_sched.schedule_job(async_query, 0, args, QUERY); - Buffer *get_buffer() { - return m_buffer; + return result; } int internal_append(const R &rec, bool ts) { - Buffer *buffer; - while (!(buffer = get_buffer())) - ; - - if (buffer->is_full()) { - merge_buffer(); + if (m_buffer->is_at_low_watermark()) { + auto old = false; + + if (m_reconstruction_scheduled.compare_exchange_strong(old, true)) { + schedule_reconstruction(); + } } - return buffer->append(rec, ts); + /* this will fail if the HWM is reached and return 0 */ + return m_buffer->append(rec, ts); } - std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer) { - if constexpr (!Q::SKIP_DELETE_FILTER) { + static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) { + if constexpr (Q::SKIP_DELETE_FILTER) { return records; } std::vector<Wrapped<R>> processed_records; processed_records.reserve(records.size()); - // For delete tagging, we just need to check the delete bit on each - // record. + /* + * For delete tagging, we just need to check the delete bit + * on each record. + */ if constexpr (D == DeletePolicy::TAGGING) { for (auto &rec : records) { if (rec.is_deleted()) { @@ -309,25 +646,35 @@ private: return processed_records; } - // For tombstone deletes, we need to search for the corresponding - // tombstone for each record. + /* + * For tombstone deletes, we need to search for the corresponding + * tombstone for each record. + */ for (auto &rec : records) { if (rec.is_tombstone()) { continue; } - if (buffer->check_tombstone(rec.rec)) { - continue; + // FIXME: need to figure out how best to re-enable the buffer tombstone + // check in the correct manner. + //if (buffview.check_tombstone(rec.rec)) { + //continue; + //} + + for (size_t i=0; i<bview->get_record_count(); i++) { + if (bview->get(i)->is_tombstone() && bview->get(i)->rec == rec.rec) { + continue; + } } if (shid != INVALID_SHID) { for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { - if (m_levels[lvl]->check_tombstone(0, rec.rec)) { + if (vers->get_levels()[lvl]->check_tombstone(0, rec.rec)) { continue; } } - if (m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) { + if (vers->get_levels()[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) { continue; } } @@ -338,197 +685,70 @@ private: return processed_records; } - /* - * Add a new level to the LSM Tree and return that level's index. Will - * automatically determine whether the level should be on memory or on disk, - * and act appropriately. - */ - inline level_index grow() { - level_index new_idx; - - size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - new_idx = m_levels.size(); - if (new_idx > 0) { - assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); - } - m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)); - - return new_idx; - } - - - // Merge the memory table down into the tree, completing any required other - // merges to make room for it. - inline void merge_buffer() { - auto buffer = get_buffer(); - - if (!can_merge_with(0, buffer->get_record_count())) { - merge_down(0); + void SetThreadAffinity() { + int core = m_next_core.fetch_add(1) % m_core_cnt; + cpu_set_t mask; + CPU_ZERO(&mask); + + switch (core % 2) { + case 0: + // 0 |-> 0 + // 2 |-> 2 + // 4 |-> 4 + core = core; + break; + case 1: + // 1 |-> 28 + // 3 |-> 30 + // 5 |-> 32 + core = (core - 1) + m_core_cnt; + break; } - - merge_buffer_into_l0(buffer); - enforce_delete_maximum(0); - - buffer->truncate(); - return; + CPU_SET(core, &mask); + ::sched_setaffinity(0, sizeof(mask), &mask); } - /* - * Merge the specified level down into the tree. The level index must be - * non-negative (i.e., this function cannot be used to merge the buffer). This - * routine will recursively perform any necessary merges to make room for the - * specified level. - */ - inline void merge_down(level_index idx) { - level_index merge_base_level = find_mergable_level(idx); - if (merge_base_level == -1) { - merge_base_level = grow(); - } - for (level_index i=merge_base_level; i>idx; i--) { - merge_levels(i, i-1); - enforce_delete_maximum(i); - } + void end_job(_Epoch *epoch) { + epoch_ptr old, new_ptr; - return; - } + do { + if (m_previous_epoch.load().epoch == epoch) { + old = m_previous_epoch; + /* + * This could happen if we get into the system during a + * transition. In this case, we can just back out and retry + */ + if (old.epoch == nullptr) { + continue; + } - /* - * Find the first level below the level indicated by idx that - * is capable of sustaining a merge operation and return its - * level index. If no such level exists, returns -1. Also - * returns -1 if idx==0, and no such level exists, to simplify - * the logic of the first merge. - */ - inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { + assert(old.refcnt > 0); - if (idx == 0 && m_levels.size() == 0) return -1; + new_ptr = {old.epoch, old.refcnt - 1}; + if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } + } else { + old = m_current_epoch; + /* + * This could happen if we get into the system during a + * transition. In this case, we can just back out and retry + */ + if (old.epoch == nullptr) { + continue; + } - bool level_found = false; - bool disk_level; - level_index merge_level_idx; + assert(old.refcnt > 0); - size_t incoming_rec_cnt = get_level_record_count(idx, buffer); - for (level_index i=idx+1; i<m_levels.size(); i++) { - if (can_merge_with(i, incoming_rec_cnt)) { - return i; + new_ptr = {old.epoch, old.refcnt - 1}; + if (m_current_epoch.compare_exchange_strong(old, new_ptr)) { + break; + } } - - incoming_rec_cnt = get_level_record_count(i); - } - - return -1; - } - - /* - * Merge the level specified by incoming level into the level specified - * by base level. The two levels should be sequential--i.e. no levels - * are skipped in the merge process--otherwise the tombstone ordering - * invariant may be violated by the merge operation. - */ - inline void merge_levels(level_index base_level, level_index incoming_level) { - // merging two memory levels - if constexpr (L == LayoutPolicy::LEVELING) { - auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); - mark_as_unused(tmp); - } else { - m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); - } - - mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor); - } - - - inline void merge_buffer_into_l0(Buffer *buffer) { - assert(m_levels[0]); - if constexpr (L == LayoutPolicy::LEVELING) { - // FIXME: Kludgey implementation due to interface constraints. - auto old_level = m_levels[0]; - auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); - temp_level->append_buffer(buffer); - auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level); - - m_levels[0] = new_level; - delete temp_level; - mark_as_unused(old_level); - } else { - m_levels[0]->append_buffer(buffer); - } - } - - /* - * Mark a given memory level as no-longer in use by the tree. For now this - * will just free the level. In future, this will be more complex as the - * level may not be able to immediately be deleted, depending upon who - * else is using it. - */ - inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) { - delete level; - } - - /* - * Check the tombstone proportion for the specified level and - * if the limit is exceeded, forcibly merge levels until all - * levels below idx are below the limit. - */ - inline void enforce_delete_maximum(level_index idx) { - long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx); - - if (ts_prop > (long double) m_max_delete_prop) { - merge_down(idx); - } - - return; - } - - /* - * Assume that level "0" should be larger than the buffer. The buffer - * itself is index -1, which should return simply the buffer capacity. - */ - inline size_t calc_level_record_capacity(level_index idx) { - return get_buffer()->get_capacity() * pow(m_scale_factor, idx+1); + } while (true); } - /* - * Returns the actual number of records present on a specified level. An - * index value of -1 indicates the memory table. Can optionally pass in - * a pointer to the memory table to use, if desired. Otherwise, there are - * no guarantees about which buffer will be accessed if level_index is -1. - */ - inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) { - - assert(idx >= -1); - if (idx == -1) { - return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count(); - } - - return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; - } - - /* - * Determines if the specific level can merge with another record containing - * incoming_rec_cnt number of records. The provided level index should be - * non-negative (i.e., not refer to the buffer) and will be automatically - * translated into the appropriate index into either the disk or memory level - * vector. - */ - inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) { - if (idx>= m_levels.size() || !m_levels[idx]) { - return false; - } - - if (L == LayoutPolicy::LEVELING) { - return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); - } else { - return m_levels[idx]->get_shard_count() < m_scale_factor; - } - - // unreachable - assert(true); - } }; - } diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h deleted file mode 100644 index ec8ffc4..0000000 --- a/include/framework/InternalLevel.h +++ /dev/null @@ -1,213 +0,0 @@ -/* - * include/framework/InternalLevel.h - * - * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> - * Dong Xie <dongx@psu.edu> - * - * All rights reserved. Published under the Modified BSD License. - * - */ -#pragma once - -#include <vector> -#include <memory> - -#include "util/types.h" -#include "framework/ShardInterface.h" -#include "framework/QueryInterface.h" -#include "framework/RecordInterface.h" -#include "framework/MutableBuffer.h" - -namespace de { - -template <RecordInterface R, ShardInterface S, QueryInterface Q> -class InternalLevel { - typedef S Shard; - typedef MutableBuffer<R> Buffer; -public: - InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no) - , m_shard_cnt(0) - , m_shards(shard_cap, nullptr) - , m_owns(shard_cap, true) - {} - - // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 - // WARNING: for leveling only. - InternalLevel(InternalLevel* level) - : m_level_no(level->m_level_no + 1) - , m_shard_cnt(level->m_shard_cnt) - , m_shards(level->m_shards.size(), nullptr) - , m_owns(level->m_owns.size(), true) { - assert(m_shard_cnt == 1 && m_shards.size() == 1); - - for (size_t i=0; i<m_shards.size(); i++) { - level->m_owns[i] = false; - m_shards[i] = level->m_shards[i]; - } - } - - ~InternalLevel() { - for (size_t i=0; i<m_shards.size(); i++) { - if (m_owns[i]) delete m_shards[i]; - } - } - - // WARNING: for leveling only. - // assuming the base level is the level new level is merging into. (base_level is larger.) - static InternalLevel* merge_levels(InternalLevel* base_level, InternalLevel* new_level) { - assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); - auto res = new InternalLevel(base_level->m_level_no, 1); - res->m_shard_cnt = 1; - Shard* shards[2]; - shards[0] = base_level->m_shards[0]; - shards[1] = new_level->m_shards[0]; - - res->m_shards[0] = new S(shards, 2); - return res; - } - - void append_buffer(Buffer* buffer) { - assert(m_shard_cnt < m_shards.size()); - m_shards[m_shard_cnt] = new S(buffer); - m_owns[m_shard_cnt] = true; - ++m_shard_cnt; - } - - void append_merged_shards(InternalLevel* level) { - assert(m_shard_cnt < m_shards.size()); - m_shards[m_shard_cnt] = new S(level->m_shards.data(), level->m_shard_cnt); - m_owns[m_shard_cnt] = true; - - ++m_shard_cnt; - } - - Shard *get_merged_shard() { - Shard *shards[m_shard_cnt]; - - for (size_t i=0; i<m_shard_cnt; i++) { - shards[i] = m_shards[i]; - } - - return new S(shards, m_shard_cnt); - } - - // Append the sample range in-order..... - void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) { - for (size_t i=0; i<m_shard_cnt; i++) { - if (m_shards[i]) { - auto shard_state = Q::get_query_state(m_shards[i], query_parms); - shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i]}); - shard_states.emplace_back(shard_state); - } - } - } - - bool check_tombstone(size_t shard_stop, const R& rec) { - if (m_shard_cnt == 0) return false; - - for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec, true); - if (res && res->is_tombstone()) { - return true; - } - } - } - return false; - } - - bool delete_record(const R &rec) { - if (m_shard_cnt == 0) return false; - - for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec); - if (res) { - res->set_delete(); - return true; - } - } - } - - return false; - } - - Shard* get_shard(size_t idx) { - return m_shards[idx]; - } - - size_t get_shard_count() { - return m_shard_cnt; - } - - size_t get_record_count() { - size_t cnt = 0; - for (size_t i=0; i<m_shard_cnt; i++) { - cnt += m_shards[i]->get_record_count(); - } - - return cnt; - } - - size_t get_tombstone_count() { - size_t res = 0; - for (size_t i = 0; i < m_shard_cnt; ++i) { - res += m_shards[i]->get_tombstone_count(); - } - return res; - } - - size_t get_aux_memory_usage() { - size_t cnt = 0; - for (size_t i=0; i<m_shard_cnt; i++) { - cnt += m_shards[i]->get_aux_memory_usage(); - } - - return cnt; - } - - size_t get_memory_usage() { - size_t cnt = 0; - for (size_t i=0; i<m_shard_cnt; i++) { - if (m_shards[i]) { - cnt += m_shards[i]->get_memory_usage(); - } - } - - return cnt; - } - - double get_tombstone_prop() { - size_t tscnt = 0; - size_t reccnt = 0; - for (size_t i=0; i<m_shard_cnt; i++) { - if (m_shards[i]) { - tscnt += m_shards[i]->get_tombstone_count(); - reccnt += (*m_shards[i])->get_record_count(); - } - } - - return (double) tscnt / (double) (tscnt + reccnt); - } - -private: - ssize_t m_level_no; - - size_t m_shard_cnt; - size_t m_shard_size_cap; - - std::vector<Shard*> m_shards; - std::vector<bool> m_owns; - - InternalLevel *clone() { - auto new_level = new InternalLevel(m_level_no, m_shards.size()); - for (size_t i=0; i<m_shard_cnt; i++) { - new_level->m_shards[i] = m_shards[i]; - new_level->m_owns[i] = true; - m_owns[i] = false; - } - } -}; - -} diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h deleted file mode 100644 index b79fc02..0000000 --- a/include/framework/MutableBuffer.h +++ /dev/null @@ -1,180 +0,0 @@ -/* - * include/framework/MutableBuffer.h - * - * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> - * Dong Xie <dongx@psu.edu> - * - * All rights reserved. Published under the Modified BSD License. - * - */ -#pragma once - -#include <cstdlib> -#include <atomic> -#include <cassert> -#include <numeric> -#include <algorithm> -#include <type_traits> - -#include "psu-util/alignment.h" -#include "util/bf_config.h" -#include "psu-ds/BloomFilter.h" -#include "psu-ds/Alias.h" -#include "psu-util/timer.h" -#include "framework/RecordInterface.h" - -using psudb::CACHELINE_SIZE; - -namespace de { - -template <RecordInterface R> -class MutableBuffer { -public: - MutableBuffer(size_t capacity, size_t max_tombstone_cap) - : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) - , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { - auto len = capacity * sizeof(Wrapped<R>); - size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE)); - m_data = (Wrapped<R>*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); - m_tombstone_filter = nullptr; - if (max_tombstone_cap > 0) { - m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS); - } - } - - ~MutableBuffer() { - if (m_data) free(m_data); - if (m_tombstone_filter) delete m_tombstone_filter; - } - - template <typename R_ = R> - int append(const R &rec, bool tombstone=false) { - if (tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; - - int32_t pos = 0; - if ((pos = try_advance_tail()) == -1) return 0; - - Wrapped<R> wrec; - wrec.rec = rec; - wrec.header = 0; - if (tombstone) wrec.set_tombstone(); - - m_data[pos] = wrec; - m_data[pos].header |= (pos << 2); - - if (tombstone) { - m_tombstonecnt.fetch_add(1); - if (m_tombstone_filter) m_tombstone_filter->insert(rec); - } - - if constexpr (WeightedRecordInterface<R_>) { - m_weight.fetch_add(rec.weight); - double old = m_max_weight.load(); - while (old < rec.weight) { - m_max_weight.compare_exchange_strong(old, rec.weight); - old = m_max_weight.load(); - } - } else { - m_weight.fetch_add(1); - } - - return 1; - } - - bool truncate() { - m_tombstonecnt.store(0); - m_reccnt.store(0); - m_weight.store(0); - m_max_weight.store(0); - if (m_tombstone_filter) m_tombstone_filter->clear(); - - return true; - } - - size_t get_record_count() { - return m_reccnt; - } - - size_t get_capacity() { - return m_cap; - } - - bool is_full() { - return m_reccnt == m_cap; - } - - size_t get_tombstone_count() { - return m_tombstonecnt.load(); - } - - bool delete_record(const R& rec) { - auto offset = 0; - while (offset < m_reccnt.load()) { - if (m_data[offset].rec == rec) { - m_data[offset].set_delete(); - return true; - } - offset++; - } - - return false; - } - - bool check_tombstone(const R& rec) { - if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; - - auto offset = 0; - while (offset < m_reccnt.load()) { - if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) { - return true; - } - offset++;; - } - return false; - } - - size_t get_memory_usage() { - return m_cap * sizeof(R); - } - - size_t get_aux_memory_usage() { - return m_tombstone_filter->get_memory_usage(); - } - - size_t get_tombstone_capacity() { - return m_tombstone_cap; - } - - double get_total_weight() { - return m_weight.load(); - } - - Wrapped<R> *get_data() { - return m_data; - } - - double get_max_weight() { - return m_max_weight; - } - -private: - int32_t try_advance_tail() { - size_t new_tail = m_reccnt.fetch_add(1); - - if (new_tail < m_cap) return new_tail; - else return -1; - } - - size_t m_cap; - size_t m_tombstone_cap; - - Wrapped<R>* m_data; - psudb::BloomFilter<R>* m_tombstone_filter; - - alignas(64) std::atomic<size_t> m_tombstonecnt; - alignas(64) std::atomic<uint32_t> m_reccnt; - alignas(64) std::atomic<double> m_weight; - alignas(64) std::atomic<double> m_max_weight; -}; - -} diff --git a/include/framework/QueryInterface.h b/include/framework/QueryInterface.h deleted file mode 100644 index 46a1ce1..0000000 --- a/include/framework/QueryInterface.h +++ /dev/null @@ -1,35 +0,0 @@ -/* - * include/framework/QueryInterface.h - * - * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> - * - * All rights reserved. Published under the Modified BSD License. - * - */ -#pragma once - -#include <vector> -#include <concepts> -#include "util/types.h" - -template <typename Q> -concept QueryInterface = requires(Q q, void *p, std::vector<void*> &s) { - -/* - {q.get_query_state(p, p)} -> std::convertible_to<void*>; - {q.get_buffer_query_state(p, p)}; - {q.query(p, p)}; - {q.buffer_query(p, p)}; - {q.merge()}; - {q.delete_query_state(p)}; -*/ - {Q::EARLY_ABORT} -> std::convertible_to<bool>; - {Q::SKIP_DELETE_FILTER} -> std::convertible_to<bool>; - //{Q::get_query_state(p, p)} -> std::convertible_to<void*>; - //{Q::get_buffer_query_state(p, p)} -> std::convertible_to<void*>; - {Q::process_query_states(p, s, p)}; - - {Q::delete_query_state(std::declval<void*>())} -> std::same_as<void>; - {Q::delete_buffer_query_state(p)}; - -}; diff --git a/include/framework/QueryRequirements.h b/include/framework/QueryRequirements.h new file mode 100644 index 0000000..dcba67e --- /dev/null +++ b/include/framework/QueryRequirements.h @@ -0,0 +1,17 @@ +/* + * include/framework/QueryRequirements.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * A header file containing the necessary includes for Query + * development. + * + */ +#pragma once + +#include "framework/structure/BufferView.h" +#include "framework/interface/Record.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" diff --git a/include/framework/ShardInterface.h b/include/framework/ShardInterface.h deleted file mode 100644 index 3aa62df..0000000 --- a/include/framework/ShardInterface.h +++ /dev/null @@ -1,26 +0,0 @@ -/* - * include/framework/ShardInterface.h - * - * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> - * - * All rights reserved. Published under the Modified BSD License. - * - */ -#pragma once - -#include <concepts> - -#include "util/types.h" -#include "framework/RecordInterface.h" - -namespace de { - -//template <template<typename> typename S, typename R> -template <typename S> -concept ShardInterface = requires(S s, void *p, bool b) { - //{s.point_lookup(r, b) } -> std::same_as<R*>; - {s.get_record_count()} -> std::convertible_to<size_t>; - {s.get_memory_usage()} -> std::convertible_to<size_t>; -}; - -} diff --git a/include/framework/ShardRequirements.h b/include/framework/ShardRequirements.h new file mode 100644 index 0000000..d054030 --- /dev/null +++ b/include/framework/ShardRequirements.h @@ -0,0 +1,17 @@ +/* + * include/framework/ShardRequirements.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * A header file containing the necessary includes for Shard + * development. + * + */ +#pragma once + +#include "framework/structure/BufferView.h" +#include "framework/interface/Record.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h new file mode 100644 index 0000000..3d487f0 --- /dev/null +++ b/include/framework/interface/Query.h @@ -0,0 +1,30 @@ +/* + * include/framework/interface/Query.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once + +#include "framework/QueryRequirements.h" + +namespace de{ + +template <typename Q, typename R, typename S> +concept QueryInterface = requires(void *p, S *sh, std::vector<void*> &s, std::vector<std::vector<Wrapped<R>>> &rv, BufferView<R> *bv) { + {Q::get_query_state(sh, p)} -> std::convertible_to<void*>; + {Q::get_buffer_query_state(bv, p)} -> std::convertible_to<void *>; + {Q::process_query_states(p, s, p)}; + {Q::query(sh, p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>; + {Q::buffer_query(p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>; + {Q::merge(rv, p)} -> std::convertible_to<std::vector<R>>; + + {Q::delete_query_state(p)} -> std::same_as<void>; + {Q::delete_buffer_query_state(p)} -> std::same_as<void>; + + {Q::EARLY_ABORT} -> std::convertible_to<bool>; + {Q::SKIP_DELETE_FILTER} -> std::convertible_to<bool>; +}; +} diff --git a/include/framework/RecordInterface.h b/include/framework/interface/Record.h index f78918c..5b9f307 100644 --- a/include/framework/RecordInterface.h +++ b/include/framework/interface/Record.h @@ -1,11 +1,12 @@ /* - * include/framework/RecordInterface.h + * include/framework/interface/Record.h * * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> - * Dong Xie <dongx@psu.edu> * - * All rights reserved. Published under the Modified BSD License. + * Distributed under the Modified BSD License. * + * FIXME: the record implementations could probably be broken out into + * different files, leaving only the interface here */ #pragma once @@ -137,7 +138,7 @@ struct CosinePoint{ return true; } - // lexicographic order + /* lexicographic order */ inline bool operator<(const CosinePoint& other) const { for (size_t i=0; i<D; i++) { if (data[i] < other.data[i]) { @@ -181,7 +182,7 @@ struct EuclidPoint{ return true; } - // lexicographic order + /* lexicographic order */ inline bool operator<(const EuclidPoint& other) const { for (size_t i=0; i<D; i++) { if (data[i] < other.data[i]) { @@ -207,8 +208,24 @@ struct EuclidPoint{ template<RecordInterface R> struct RecordHash { size_t operator()(R const &rec) const { - return psudb::hash_bytes((char *) &rec, sizeof(R)); + return psudb::hash_bytes((std::byte *) &rec, sizeof(R)); } }; +template <typename R> +class DistCmpMax { +public: + DistCmpMax(R *baseline) : P(baseline) {} + + inline bool operator()(const R *a, const R *b) requires WrappedInterface<R> { + return a->rec.calc_distance(P->rec) > b->rec.calc_distance(P->rec); + } + + inline bool operator()(const R *a, const R *b) requires (!WrappedInterface<R>){ + return a->calc_distance(*P) > b->calc_distance(*P); + } + +private: + R *P; +}; } diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h new file mode 100644 index 0000000..451ddd2 --- /dev/null +++ b/include/framework/interface/Scheduler.h @@ -0,0 +1,19 @@ +/* + * include/framework/interface/Scheduler.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once + +#include "framework/scheduling/Task.h" + +template <typename S> +concept SchedulerInterface = requires(S s, size_t i, void *vp, de::Job j) { + {S(i, i)}; + {s.schedule_job(j, i, vp, i)} -> std::convertible_to<void>; + {s.shutdown()}; + {s.print_statistics()}; +}; diff --git a/include/framework/interface/Shard.h b/include/framework/interface/Shard.h new file mode 100644 index 0000000..c4a9180 --- /dev/null +++ b/include/framework/interface/Shard.h @@ -0,0 +1,36 @@ +/* + * include/framework/interface/Shard.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once + +#include "framework/ShardRequirements.h" + +namespace de { + +template <typename S, typename R> +concept ShardInterface = RecordInterface<R> && requires(S s, std::vector<S*> spp, void *p, bool b, size_t i, BufferView<R> bv, R r) { + {S(spp)}; + {S(std::move(bv))}; + + {s.point_lookup(r, b) } -> std::same_as<Wrapped<R>*>; + {s.get_data()} -> std::same_as<Wrapped<R>*>; + + {s.get_record_count()} -> std::convertible_to<size_t>; + {s.get_tombstone_count()} -> std::convertible_to<size_t>; + {s.get_memory_usage()} -> std::convertible_to<size_t>; + {s.get_aux_memory_usage()} -> std::convertible_to<size_t>; +}; + +template <typename S, typename R> +concept SortedShardInterface = ShardInterface<S, R> && requires(S s, R r, R *rp, size_t i) { + {s.lower_bound(r)} -> std::convertible_to<size_t>; + {s.upper_bound(r)} -> std::convertible_to<size_t>; + {s.get_record_at(i)} -> std::same_as<Wrapped<R>*>; +}; + +} diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h new file mode 100644 index 0000000..9377fb0 --- /dev/null +++ b/include/framework/scheduling/Epoch.h @@ -0,0 +1,143 @@ +/* + * include/framework/scheduling/Epoch.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once + +#include <condition_variable> +#include <mutex> + +#include "framework/structure/MutableBuffer.h" +#include "framework/structure/ExtensionStructure.h" +#include "framework/structure/BufferView.h" + +namespace de { + + +template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L> +class Epoch { +private: + typedef MutableBuffer<R> Buffer; + typedef ExtensionStructure<R, S, Q, L> Structure; + typedef BufferView<R> BufView; +public: + Epoch(size_t number=0) + : m_buffer(nullptr) + , m_structure(nullptr) + , m_active_merge(false) + , m_epoch_number(number) + , m_buffer_head(0) + {} + + Epoch(size_t number, Structure *structure, Buffer *buff, size_t head) + : m_buffer(buff) + , m_structure(structure) + , m_active_merge(false) + , m_epoch_number(number) + , m_buffer_head(head) + { + structure->take_reference(); + } + + ~Epoch() { + if (m_structure) { + m_structure->release_reference(); + } + + if (m_structure->get_reference_count() == 0) { + delete m_structure; + } + + } + + /* + * Epochs are *not* copyable or movable. Only one can exist, and all users + * of it work with pointers + */ + Epoch(const Epoch&) = delete; + Epoch(Epoch&&) = delete; + Epoch &operator=(const Epoch&) = delete; + Epoch &operator=(Epoch&&) = delete; + + size_t get_epoch_number() { + return m_epoch_number; + } + + Structure *get_structure() { + return m_structure; + } + + BufView get_buffer() { + return m_buffer->get_buffer_view(m_buffer_head); + } + + /* + * Returns a new Epoch object that is a copy of this one. The new object + * will also contain a copy of the m_structure, rather than a reference to + * the same one. The epoch number of the new epoch will be set to the + * provided argument. + */ + Epoch *clone(size_t number) { + std::unique_lock<std::mutex> m_buffer_lock; + auto epoch = new Epoch(number); + epoch->m_buffer = m_buffer; + epoch->m_buffer_head = m_buffer_head; + + if (m_structure) { + epoch->m_structure = m_structure->copy(); + /* the copy routine returns a structure with 0 references */ + epoch->m_structure->take_reference(); + } + + return epoch; + } + + /* + * Check if a merge can be started from this Epoch. At present, without + * concurrent merging, this simply checks if there is currently a scheduled + * merge based on this Epoch. If there is, returns false. If there isn't, + * return true and set a flag indicating that there is an active merge. + */ + bool prepare_reconstruction() { + auto old = m_active_merge.load(); + if (old) { + return false; + } + + // FIXME: this needs cleaned up + while (!m_active_merge.compare_exchange_strong(old, true)) { + old = m_active_merge.load(); + if (old) { + return false; + } + } + + return true; + } + + bool advance_buffer_head(size_t head) { + m_buffer_head = head; + return m_buffer->advance_head(m_buffer_head); + } + +private: + Structure *m_structure; + Buffer *m_buffer; + + std::mutex m_buffer_lock; + std::atomic<bool> m_active_merge; + + /* + * The number of currently active jobs + * (queries/merges) operating on this + * epoch. An epoch can only be retired + * when this number is 0. + */ + size_t m_epoch_number; + size_t m_buffer_head; +}; +} diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h new file mode 100644 index 0000000..3ed4f49 --- /dev/null +++ b/include/framework/scheduling/FIFOScheduler.h @@ -0,0 +1,129 @@ +/* + * include/framework/scheduling/FIFOScheduler.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * This scheduler runs just concurrently, using a standard FIFO queue to + * determine which jobs to run next. If more jobs are scheduled than there + * are available threads, the excess will stall until a thread becomes + * available and then run in the order they were received by the scheduler. + * + * TODO: We need to set up a custom threadpool based on jthreads to support + * thread preemption for a later phase of this project. That will allow us + * to avoid blocking epoch transitions on long-running queries, or to pause + * reconstructions on demand. + */ +#pragma once + +#include <thread> +#include <condition_variable> +#include <chrono> +#include "framework/scheduling/Task.h" +#include "framework/scheduling/statistics.h" + +#include "ctpl/ctpl.h" +#include "psu-ds/LockedPriorityQueue.h" + +namespace de { + +using namespace std::literals::chrono_literals; + + +class FIFOScheduler { +private: + static const size_t DEFAULT_MAX_THREADS = 8; + +public: + FIFOScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS) + , m_used_memory(0) + , m_used_thrds(0) + , m_shutdown(false) + { + m_sched_thrd = std::thread(&FIFOScheduler::run, this); + m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this); + m_thrd_pool.resize(m_thrd_cnt); + } + + ~FIFOScheduler() { + if (!m_shutdown.load()) { + shutdown(); + } + + m_sched_thrd.join(); + m_sched_wakeup_thrd.join(); + } + + void schedule_job(std::function<void(void*)> job, size_t size, void *args, size_t type=0) { + std::unique_lock<std::mutex> lk(m_cv_lock); + size_t ts = m_counter.fetch_add(1); + + m_stats.job_queued(ts, type, size); + m_task_queue.push(Task(size, ts, job, args, type, &m_stats)); + + m_cv.notify_all(); + } + + void shutdown() { + m_shutdown.store(true); + m_thrd_pool.stop(true); + m_cv.notify_all(); + } + + void print_statistics() { + m_stats.print_statistics(); + } + +private: + psudb::LockedPriorityQueue<Task> m_task_queue; + + size_t m_memory_budget; + size_t m_thrd_cnt; + + std::atomic<bool> m_shutdown; + + std::atomic<size_t> m_counter; + std::mutex m_cv_lock; + std::condition_variable m_cv; + + std::thread m_sched_thrd; + std::thread m_sched_wakeup_thrd; + ctpl::thread_pool m_thrd_pool; + + std::atomic<size_t> m_used_thrds; + std::atomic<size_t> m_used_memory; + + SchedulerStatistics m_stats; + + void periodic_wakeup() { + do { + std::this_thread::sleep_for(10us); + m_cv.notify_all(); + } while (!m_shutdown.load()); + } + + void schedule_next() { + assert(m_task_queue.size() > 0); + auto t = m_task_queue.pop(); + m_stats.job_scheduled(t.m_timestamp); + + m_thrd_pool.push(t); + } + + void run() { + do { + std::unique_lock<std::mutex> cv_lock(m_cv_lock); + m_cv.wait(cv_lock); + + while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) { + schedule_next(); + } + } while(!m_shutdown.load()); + } + +}; + +} diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h new file mode 100644 index 0000000..ac59301 --- /dev/null +++ b/include/framework/scheduling/SerialScheduler.h @@ -0,0 +1,62 @@ +/* + * include/framework/scheduling/SerialScheduler.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * IMPORTANT: This "scheduler" is a shim implementation for allowing + * strictly serial, single-threaded operation of the framework. It should + * never be used in multi-threaded contexts. A call to the schedule_job + * function will immediately run the job and block on its completion before + * returning. + * + */ +#pragma once + +#include "framework/scheduling/Task.h" +#include "framework/scheduling/statistics.h" + +namespace de { + +class SerialScheduler { +public: + SerialScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) + , m_used_memory(0) + , m_used_thrds(0) + , m_counter(0) + {} + + ~SerialScheduler() = default; + + void schedule_job(std::function<void(void*)> job, size_t size, void *args, size_t type=0) { + size_t ts = m_counter++; + m_stats.job_queued(ts, type, size); + m_stats.job_scheduled(ts); + auto t = Task(size, ts, job, args, type, &m_stats); + t(0); + } + + void shutdown() { + /* intentionally left blank */ + } + + void print_statistics() { + m_stats.print_statistics(); + } + +private: + size_t m_memory_budget; + size_t m_thrd_cnt; + + size_t m_used_thrds; + size_t m_used_memory; + + size_t m_counter; + + SchedulerStatistics m_stats; +}; + +} diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h new file mode 100644 index 0000000..d5d4266 --- /dev/null +++ b/include/framework/scheduling/Task.h @@ -0,0 +1,89 @@ +/* + * include/framework/scheduling/Task.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * An abstraction to represent a job to be scheduled. Currently the + * supported task types are queries and merges. Based on the current plan, + * simple buffer inserts will likely also be made into a task at some + * point. + * + */ +#pragma once + +#include <future> +#include <functional> +#include <chrono> + +#include "framework/util/Configuration.h" +#include "framework/scheduling/Epoch.h" +#include "framework/scheduling/statistics.h" + +namespace de { + +template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L> +struct ReconstructionArgs { + Epoch<R, S, Q, L> *epoch; + std::vector<ReconstructionTask> merges; + std::promise<bool> result; + bool compaction; + void *extension; +}; + +template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L> +struct QueryArgs { + std::promise<std::vector<R>> result_set; + void *query_parms; + void *extension; +}; + +typedef std::function<void(void*)> Job; + +struct Task { + Task(size_t size, size_t ts, Job job, void *args, size_t type=0, SchedulerStatistics *stats=nullptr) + : m_job(job) + , m_size(size) + , m_timestamp(ts) + , m_args(args) + , m_type(type) + , m_stats(stats) + {} + + Job m_job; + size_t m_size; + size_t m_timestamp; + void *m_args; + size_t m_type; + SchedulerStatistics *m_stats; + + friend bool operator<(const Task &self, const Task &other) { + return self.m_timestamp < other.m_timestamp; + } + + friend bool operator>(const Task &self, const Task &other) { + return self.m_timestamp > other.m_timestamp; + } + + void operator()(size_t thrd_id) { + auto start = std::chrono::high_resolution_clock::now(); + if (m_stats) { + m_stats->job_begin(m_timestamp); + } + + m_job(m_args); + + if (m_stats) { + m_stats->job_complete(m_timestamp); + } + auto stop = std::chrono::high_resolution_clock::now(); + + if (m_stats) { + auto time = std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start).count(); + m_stats->log_time_data(time, m_type); + } + } +}; + +} diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h new file mode 100644 index 0000000..6c479cd --- /dev/null +++ b/include/framework/scheduling/statistics.h @@ -0,0 +1,118 @@ +/* + * include/framework/scheduling/statistics.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * This is a stub for a statistics tracker to be used in scheduling. It + * currently only tracks simple aggregated statistics, but should be + * updated in the future for more fine-grained statistics. These will be + * used for making scheduling decisions and predicting the runtime of a + * given job. + */ +#pragma once + +#include <cstdlib> +#include <cassert> +#include <unordered_map> +#include <vector> +#include <mutex> +#include <chrono> +#include <atomic> + +namespace de { + +class SchedulerStatistics { +private: + enum class EventType { + QUEUED, + SCHEDULED, + STARTED, + FINISHED + }; + + struct Event { + size_t id; + EventType type; + }; + + struct JobInfo { + size_t id; + size_t size; + size_t type; + }; + + +public: + SchedulerStatistics() = default; + ~SchedulerStatistics() = default; + + void job_queued(size_t id, size_t type, size_t size) { + auto time = std::chrono::high_resolution_clock::now(); + } + + void job_scheduled(size_t id) { + std::unique_lock<std::mutex> lk(m_mutex); + + } + + void job_begin(size_t id) { + + } + + void job_complete(size_t id) { + + } + + /* FIXME: This is just a temporary approach */ + void log_time_data(size_t length, size_t type) { + assert(type == 1 || type == 2); + + if (type == 1) { + m_type_1_cnt.fetch_add(1); + m_type_1_total_time.fetch_add(length); + + if (length > m_type_1_largest_time) { + m_type_1_largest_time.store(length); + } + } else { + m_type_2_cnt.fetch_add(1); + m_type_2_total_time.fetch_add(length); + + if (length > m_type_2_largest_time) { + m_type_2_largest_time.store(length); + } + } + } + + void print_statistics() { + if (m_type_1_cnt > 0) { + fprintf(stdout, "Query Count: %ld\tQuery Avg. Latency: %ld\tMax Query Latency: %ld\n", + m_type_1_cnt.load(), + m_type_1_total_time.load() / m_type_1_cnt.load(), + m_type_1_largest_time.load()); + } + if (m_type_2_cnt > 0) { + fprintf(stdout, "Reconstruction Count: %ld\tReconstruction Avg. Latency: %ld\tMax Recon. Latency:%ld\n", + m_type_2_cnt.load(), + m_type_2_total_time.load() / m_type_2_cnt.load(), + m_type_2_largest_time.load()); + } + } + +private: + std::mutex m_mutex; + std::unordered_map<size_t, JobInfo> m_jobs; + std::vector<Event> m_event_log; + + std::atomic<size_t> m_type_1_cnt; + std::atomic<size_t> m_type_1_total_time; + + std::atomic<size_t> m_type_2_cnt; + std::atomic<size_t> m_type_2_total_time; + + std::atomic<size_t> m_type_1_largest_time; + std::atomic<size_t> m_type_2_largest_time; +}; +} diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h new file mode 100644 index 0000000..9e0872b --- /dev/null +++ b/include/framework/structure/BufferView.h @@ -0,0 +1,170 @@ +/* + * include/framework/structure/BufferView.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * TODO: This file is very poorly commented. + */ +#pragma once + +#include <cstdlib> +#include <cassert> +#include <functional> +#include <utility> + +#include "psu-util/alignment.h" +#include "psu-ds/BloomFilter.h" +#include "framework/interface/Record.h" + +namespace de { + +typedef std::_Bind<void (*(void*, long unsigned int))(void*, long unsigned int)> ReleaseFunction; + +template <RecordInterface R> +class BufferView { +public: + BufferView() = default; + + /* + * the BufferView's lifetime is tightly linked to buffer versioning, and so + * copying and assignment are disabled. + */ + BufferView(const BufferView&) = delete; + BufferView &operator=(BufferView &) = delete; + + BufferView(BufferView &&other) + : m_data(std::exchange(other.m_data, nullptr)) + , m_release(std::move(other.m_release)) + , m_head(std::exchange(other.m_head, 0)) + , m_tail(std::exchange(other.m_tail, 0)) + , m_start(std::exchange(other.m_start, 0)) + , m_stop(std::exchange(other.m_stop, 0)) + , m_cap(std::exchange(other.m_cap, 0)) + , m_approx_ts_cnt(std::exchange(other.m_approx_ts_cnt, 0)) + , m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr)) + , m_active(std::exchange(other.m_active, false)) {} + + BufferView &operator=(BufferView &&other) = delete; + + + BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail, size_t tombstone_cnt, psudb::BloomFilter<R> *filter, + ReleaseFunction release) + : m_data(buffer) + , m_release(release) + , m_head(head) + , m_tail(tail) + , m_start(m_head % cap) + , m_stop(m_tail % cap) + , m_cap(cap) + , m_approx_ts_cnt(tombstone_cnt) + , m_tombstone_filter(filter) + , m_active(true) {} + + ~BufferView() { + if (m_active) { + m_release(); + } + } + + bool check_tombstone(const R& rec) { + if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; + + for (size_t i=0; i<get_record_count(); i++) { + if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) { + return true; + } + } + + return false; + } + + bool delete_record(const R& rec) { + if (m_start < m_stop) { + for (size_t i=m_start; i<m_stop; i++) { + if (m_data[i].rec == rec) { + m_data[i].set_delete(); + return true; + } + } + } else { + for (size_t i=m_start; i<m_cap; i++) { + if (m_data[i].rec == rec) { + m_data[i].set_delete(); + return true; + } + } + + for (size_t i=0; i<m_stop; i++) { + if (m_data[i].rec == rec) { + m_data[i].set_delete(); + return true; + } + + } + + } + + return false; + } + + size_t get_record_count() { + return m_tail - m_head; + } + + /* + * NOTE: This function returns an upper bound on the number + * of tombstones within the view. There may be less than + * this, due to synchronization issues during view creation. + */ + size_t get_tombstone_count() { + return m_approx_ts_cnt; + } + + Wrapped<R> *get(size_t i) { + assert(i < get_record_count()); + return m_data + to_idx(i); + } + + void copy_to_buffer(psudb::byte *buffer) { + /* check if the region to be copied circles back to start. If so, do it in two steps */ + if (m_start > m_stop) { + size_t split_idx = m_cap - m_start; + + memcpy(buffer, (std::byte*) (m_data + m_start), split_idx* sizeof(Wrapped<R>)); + memcpy(buffer + (split_idx * sizeof(Wrapped<R>)), (std::byte*) m_data, m_stop * sizeof(Wrapped<R>)); + } else { + memcpy(buffer, (std::byte*) (m_data + m_start), get_record_count() * sizeof(Wrapped<R>)); + } + } + + size_t get_tail() { + return m_tail; + } + + size_t get_head() { + return m_head; + } + +private: + Wrapped<R>* m_data; + ReleaseFunction m_release; + size_t m_head; + size_t m_tail; + size_t m_start; + size_t m_stop; + size_t m_cap; + size_t m_approx_ts_cnt; + psudb::BloomFilter<R> *m_tombstone_filter; + bool m_active; + + size_t to_idx(size_t i) { + size_t idx = (m_start + i >= m_cap) ? i = (m_cap - m_start) + : m_start + i; + assert(idx < m_cap); + return idx; + } +}; + +} diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h new file mode 100644 index 0000000..4802bc1 --- /dev/null +++ b/include/framework/structure/ExtensionStructure.h @@ -0,0 +1,495 @@ +/* + * include/framework/structure/ExtensionStructure.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once + +#include <atomic> +#include <cstdio> +#include <vector> + +#include "framework/structure/BufferView.h" +#include "framework/structure/InternalLevel.h" + +#include "framework/util/Configuration.h" + +#include "psu-util/timer.h" + +namespace de { + +template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L=LayoutPolicy::TEIRING> +class ExtensionStructure { + typedef S Shard; + typedef BufferView<R> BuffView; + +public: + ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) + : m_scale_factor(scale_factor) + , m_max_delete_prop(max_delete_prop) + , m_buffer_size(buffer_size) + {} + + ~ExtensionStructure() = default; + + /* + * Create a shallow copy of this extension structure. The copy will share + * references to the same levels/shards as the original, but will have its + * own lists. As all of the shards are immutable (with the exception of + * deletes), the copy can be restructured with reconstructions and flushes + * without affecting the original. The copied structure will be returned + * with a reference count of 0; generally you will want to immediately call + * take_reference() on it. + * + * NOTE: When using tagged deletes, a delete of a record in the original + * structure will affect the copy, so long as the copy retains a reference + * to the same shard as the original. This could cause synchronization + * problems under tagging with concurrency. Any deletes in this context will + * need to be forwarded to the appropriate structures manually. + */ + ExtensionStructure<R, S, Q, L> *copy() { + auto new_struct = new ExtensionStructure<R, S, Q, L>(m_buffer_size, m_scale_factor, + m_max_delete_prop); + for (size_t i=0; i<m_levels.size(); i++) { + new_struct->m_levels.push_back(m_levels[i]->clone()); + } + + new_struct->m_refcnt = 0; + + return new_struct; + } + + /* + * Search for a record matching the argument and mark it deleted by + * setting the delete bit in its wrapped header. Returns 1 if a matching + * record was found and deleted, and 0 if a matching record was not found. + * + * This function will stop after finding the first matching record. It is + * assumed that no duplicate records exist. In the case of duplicates, this + * function will still "work", but in the sense of "delete first match". + */ + int tagged_delete(const R &rec) { + for (auto level : m_levels) { + if (level && level->delete_record(rec)) { + return 1; + } + } + + /* + * If the record to be erased wasn't found, return 0. The + * DynamicExtension itself will then search the active + * Buffers. + */ + return 0; + } + + /* + * Flush a buffer into the extension structure, performing any necessary + * reconstructions to free up room in L0. + * + * FIXME: arguably, this should be a method attached to the buffer that + * takes a structure as input. + */ + inline bool flush_buffer(BuffView buffer) { + assert(can_reconstruct_with(0, buffer.get_record_count())); + + flush_buffer_into_l0(std::move(buffer)); + + return true; + } + + /* + * Return the total number of records (including tombstones) within all + * of the levels of the structure. + */ + size_t get_record_count() { + size_t cnt = 0; + + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) cnt += m_levels[i]->get_record_count(); + } + + return cnt; + } + + /* + * Return the total number of tombstones contained within all of the + * levels of the structure. + */ + size_t get_tombstone_count() { + size_t cnt = 0; + + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count(); + } + + return cnt; + } + + /* + * Return the number of levels within the structure. Note that not + * all of these levels are necessarily populated. + */ + size_t get_height() { + return m_levels.size(); + } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing the primary data structure and raw data. + */ + size_t get_memory_usage() { + size_t cnt = 0; + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) cnt += m_levels[i]->get_memory_usage(); + } + + return cnt; + } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing auxiliary data structures. This total does not + * include memory used for the main data structure, or raw data. + */ + size_t get_aux_memory_usage() { + size_t cnt = 0; + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) { + cnt += m_levels[i]->get_aux_memory_usage(); + } + } + + return cnt; + } + + /* + * Validate that no level in the structure exceeds its maximum tombstone + * capacity. This is used to trigger preemptive compactions at the end of + * the reconstruction process. + */ + bool validate_tombstone_proportion() { + long double ts_prop; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + ts_prop = (long double)m_levels[i]->get_tombstone_count() / + (long double)calc_level_record_capacity(i); + if (ts_prop > (long double)m_max_delete_prop) { + return false; + } + } + } + + return true; + } + + bool validate_tombstone_proportion(level_index level) { + long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level); + return ts_prop <= (long double) m_max_delete_prop; + } + + /* + * Return a reference to the underlying vector of levels within the + * structure. + */ + std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> &get_levels() { + return m_levels; + } + + std::vector<ReconstructionTask> get_compaction_tasks() { + std::vector<ReconstructionTask> tasks; + + /* if the tombstone/delete invariant is satisfied, no need for compactions */ + if (validate_tombstone_proportion()) { + return tasks; + } + + /* locate the first level to violate the invariant */ + level_index violation_idx = -1; + for (level_index i=0; i<m_levels.size(); i++) { + if (!validate_tombstone_proportion(i)) { + violation_idx = i; + break; + } + } + + assert(violation_idx != -1); + + level_index base_level = find_reconstruction_target(violation_idx); + if (base_level == -1) { + base_level = grow(); + } + + for (level_index i=base_level; i>0; i--) { + ReconstructionTask task = {i-1, i}; + + /* + * The amount of storage required for the reconstruction accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i - 1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_reconstruct_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } + //task.m_size = 2* reccnt * sizeof(R); + + tasks.push_back(task); + } + + return tasks; + } + + /* + * + */ + std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt) { + std::vector<ReconstructionTask> reconstructions; + + /* + * The buffer flush is not included so if that can be done without any + * other change, just return an empty list. + */ + if (can_reconstruct_with(0, buffer_reccnt)) { + return std::move(reconstructions); + } + + level_index base_level = find_reconstruction_target(0); + if (base_level == -1) { + base_level = grow(); + } + + for (level_index i=base_level; i>0; i--) { + ReconstructionTask task = {i-1, i}; + + /* + * The amount of storage required for the reconstruction accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i-1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_reconstruct_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } + //task.m_size = 2* reccnt * sizeof(R); + + reconstructions.push_back(task); + } + + return std::move(reconstructions); + } + + + /* + * + */ + std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level) { + std::vector<ReconstructionTask> reconstructions; + + level_index base_level = find_reconstruction_target(source_level); + if (base_level == -1) { + base_level = grow(); + } + + for (level_index i=base_level; i>source_level; i--) { + ReconstructionTask task = {i - 1, i}; + /* + * The amount of storage required for the reconstruction accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i-1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_reconstruct_with(i, reccnt)) { + reccnt += m_levels[i]->get_record_count(); + } + } +// task.m_size = 2* reccnt * sizeof(R); + + reconstructions.push_back(task); + } + + return reconstructions; + } + + /* + * Combine incoming_level with base_level and reconstruct the shard, + * placing it in base_level. The two levels should be sequential--i.e. no + * levels are skipped in the reconstruction process--otherwise the + * tombstone ordering invariant may be violated. + */ + inline void reconstruction(level_index base_level, level_index incoming_level) { + if constexpr (L == LayoutPolicy::LEVELING) { + /* if the base level has a shard, merge the base and incoming together to make a new one */ + if (m_levels[base_level]->get_shard_count() > 0) { + m_levels[base_level] = InternalLevel<R, Shard, Q>::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get()); + /* otherwise, we can just move the incoming to the base */ + } else { + m_levels[base_level] = m_levels[incoming_level]; + } + } else { + m_levels[base_level]->append_level(m_levels[incoming_level].get()); + m_levels[base_level]->finalize(); + } + + /* place a new, empty level where the incoming level used to be */ + m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + } + + bool take_reference() { + m_refcnt.fetch_add(1); + return true; + } + + bool release_reference() { + assert(m_refcnt.load() > 0); + m_refcnt.fetch_add(-1); + return true; + } + + size_t get_reference_count() { + return m_refcnt.load(); + } + + std::vector<void *> get_query_states(std::vector<std::pair<ShardID, Shard*>> &shards, void *parms) { + std::vector<void*> states; + + for (auto &level : m_levels) { + level->get_query_states(shards, states, parms); + } + + return states; + } + +private: + size_t m_scale_factor; + double m_max_delete_prop; + size_t m_buffer_size; + + std::atomic<size_t> m_refcnt; + + std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels; + + /* + * Add a new level to the structure and return its index. + */ + inline level_index grow() { + level_index new_idx = m_levels.size(); + size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt))); + return new_idx; + } + + /* + * Find the first level below the level indicated by idx that + * is capable of sustaining a reconstruction and return its + * level index. If no such level exists, returns -1. Also + * returns -1 if idx==0, and no such level exists, to simplify + * the logic of the first buffer flush. + */ + inline level_index find_reconstruction_target(level_index idx) { + + if (idx == 0 && m_levels.size() == 0) return -1; + + size_t incoming_rec_cnt = get_level_record_count(idx); + for (level_index i=idx+1; i<m_levels.size(); i++) { + if (can_reconstruct_with(i, incoming_rec_cnt)) { + return i; + } + + incoming_rec_cnt = get_level_record_count(i); + } + + return -1; + } + + inline void flush_buffer_into_l0(BuffView buffer) { + assert(m_levels[0]); + if constexpr (L == LayoutPolicy::LEVELING) { + // FIXME: Kludgey implementation due to interface constraints. + auto old_level = m_levels[0].get(); + auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); + temp_level->append_buffer(std::move(buffer)); + + if (old_level->get_shard_count() > 0) { + m_levels[0] = InternalLevel<R, Shard, Q>::reconstruction(old_level, temp_level); + delete temp_level; + } else { + m_levels[0] = std::shared_ptr<InternalLevel<R, Shard, Q>>(temp_level); + } + } else { + m_levels[0]->append_buffer(std::move(buffer)); + } + } + + /* + * Mark a given memory level as no-longer in use by the tree. For now this + * will just free the level. In future, this will be more complex as the + * level may not be able to immediately be deleted, depending upon who + * else is using it. + */ + inline void mark_as_unused(std::shared_ptr<InternalLevel<R, Shard, Q>> level) { + level.reset(); + } + + /* + * Assume that level "0" should be larger than the buffer. The buffer + * itself is index -1, which should return simply the buffer capacity. + */ + inline size_t calc_level_record_capacity(level_index idx) { + return m_buffer_size * pow(m_scale_factor, idx+1); + } + + /* + * Returns the number of records present on a specified level. + */ + inline size_t get_level_record_count(level_index idx) { + return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; + } + + /* + * Determines if a level can sustain a reconstruction with incoming_rec_cnt + * additional records without exceeding its capacity. + */ + inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) { + if (idx >= m_levels.size() || !m_levels[idx]) { + return false; + } + + if (L == LayoutPolicy::LEVELING) { + return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx); + } else { + return m_levels[idx]->get_shard_count() < m_scale_factor; + } + + /* unreachable */ + assert(true); + } +}; + +} + diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h new file mode 100644 index 0000000..db38946 --- /dev/null +++ b/include/framework/structure/InternalLevel.h @@ -0,0 +1,271 @@ +/* + * include/framework/structure/InternalLevel.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + * The word `Internal` in this class's name refers to memory. The current + * model, inherited from the framework in Practical Dynamic Extension for + * Sampling Indexes, would use a different ExternalLevel for shards stored + * on external storage. This is a distinction that can probably be avoided + * with some more thought being put into interface design. + * + */ +#pragma once + +#include <vector> +#include <memory> + +#include "util/types.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" +#include "framework/interface/Record.h" +#include "framework/structure/BufferView.h" + +namespace de { +template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q> +class InternalLevel; + + + +template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q> +class InternalLevel { + typedef S Shard; + typedef BufferView<R> BuffView; +public: + InternalLevel(ssize_t level_no, size_t shard_cap) + : m_level_no(level_no) + , m_shard_cnt(0) + , m_shards(shard_cap, nullptr) + , m_pending_shard(nullptr) + {} + + ~InternalLevel() { + delete m_pending_shard; + } + + /* + * Create a new shard combining the records from base_level and new_level, + * and return a shared_ptr to a new level containing this shard. This is used + * for reconstructions under the leveling layout policy. + * + * No changes are made to the levels provided as arguments. + */ + static std::shared_ptr<InternalLevel> reconstruction(InternalLevel* base_level, InternalLevel* new_level) { + assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); + auto res = new InternalLevel(base_level->m_level_no, 1); + res->m_shard_cnt = 1; + std::vector<Shard *> shards = {base_level->m_shards[0].get(), + new_level->m_shards[0].get()}; + + res->m_shards[0] = std::make_shared<S>(shards); + return std::shared_ptr<InternalLevel>(res); + } + + /* + * Create a new shard combining the records from all of + * the shards in level, and append this new shard into + * this level. This is used for reconstructions under + * the tiering layout policy. + * + * No changes are made to the level provided as an argument. + */ + void append_level(InternalLevel* level) { + // FIXME: that this is happening probably means that + // something is going terribly wrong earlier in the + // reconstruction logic. + if (level->get_shard_count() == 0) { + return; + } + + std::vector<S*> shards; + for (auto shard : level->m_shards) { + if (shard) shards.emplace_back(shard.get()); + } + + if (m_shard_cnt == m_shards.size()) { + m_pending_shard = new S(shards); + return; + } + + auto tmp = new S(shards); + m_shards[m_shard_cnt] = std::shared_ptr<S>(tmp); + + ++m_shard_cnt; + } + + /* + * Create a new shard using the records in the + * provided buffer, and append this new shard + * into this level. This is used for buffer + * flushes under the tiering layout policy. + */ + void append_buffer(BuffView buffer) { + if (m_shard_cnt == m_shards.size()) { + assert(m_pending_shard == nullptr); + m_pending_shard = new S(std::move(buffer)); + return; + } + + m_shards[m_shard_cnt] = std::make_shared<S>(std::move(buffer)); + ++m_shard_cnt; + } + + void finalize() { + if (m_pending_shard) { + for (size_t i=0; i<m_shards.size(); i++) { + m_shards[i] = nullptr; + } + + m_shards[0] = std::shared_ptr<S>(m_pending_shard); + m_pending_shard = nullptr; + m_shard_cnt = 1; + } + } + + /* + * Create a new shard containing the combined records + * from all shards on this level and return it. + * + * No changes are made to this level. + */ + Shard *get_combined_shard() { + if (m_shard_cnt == 0) { + return nullptr; + } + + std::vector<Shard *> shards; + for (auto shard : m_shards) { + if (shard) shards.emplace_back(shard.get()); + } + + return new S(shards); + } + + void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) { + for (size_t i=0; i<m_shard_cnt; i++) { + if (m_shards[i]) { + auto shard_state = Q::get_query_state(m_shards[i].get(), query_parms); + shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i].get()}); + shard_states.emplace_back(shard_state); + } + } + } + + bool check_tombstone(size_t shard_stop, const R& rec) { + if (m_shard_cnt == 0) return false; + + for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) { + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec, true); + if (res && res->is_tombstone()) { + return true; + } + } + } + return false; + } + + bool delete_record(const R &rec) { + if (m_shard_cnt == 0) return false; + + for (size_t i = 0; i < m_shards.size(); ++i) { + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec); + if (res) { + res->set_delete(); + return true; + } + } + } + + return false; + } + + Shard* get_shard(size_t idx) { + return m_shards[idx].get(); + } + + size_t get_shard_count() { + return m_shard_cnt; + } + + size_t get_record_count() { + size_t cnt = 0; + for (size_t i=0; i<m_shard_cnt; i++) { + if (m_shards[i]) { + cnt += m_shards[i]->get_record_count(); + } + } + + return cnt; + } + + size_t get_tombstone_count() { + size_t res = 0; + for (size_t i = 0; i < m_shard_cnt; ++i) { + if (m_shards[i]) { + res += m_shards[i]->get_tombstone_count(); + } + } + return res; + } + + size_t get_aux_memory_usage() { + size_t cnt = 0; + for (size_t i=0; i<m_shard_cnt; i++) { + if (m_shards[i]){ + cnt += m_shards[i]->get_aux_memory_usage(); + } + } + + return cnt; + } + + size_t get_memory_usage() { + size_t cnt = 0; + for (size_t i=0; i<m_shard_cnt; i++) { + if (m_shards[i]) { + cnt += m_shards[i]->get_memory_usage(); + } + } + + return cnt; + } + + double get_tombstone_prop() { + size_t tscnt = 0; + size_t reccnt = 0; + for (size_t i=0; i<m_shard_cnt; i++) { + if (m_shards[i]) { + tscnt += m_shards[i]->get_tombstone_count(); + reccnt += m_shards[i]->get_record_count(); + } + } + + return (double) tscnt / (double) (tscnt + reccnt); + } + + std::shared_ptr<InternalLevel> clone() { + auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size()); + for (size_t i=0; i<m_shard_cnt; i++) { + new_level->m_shards[i] = m_shards[i]; + } + new_level->m_shard_cnt = m_shard_cnt; + + return new_level; + } + +private: + ssize_t m_level_no; + + size_t m_shard_cnt; + size_t m_shard_size_cap; + + std::vector<std::shared_ptr<Shard>> m_shards; + Shard *m_pending_shard; +}; + +} diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h new file mode 100644 index 0000000..415c95a --- /dev/null +++ b/include/framework/structure/MutableBuffer.h @@ -0,0 +1,313 @@ +/* + * include/framework/structure/MutableBuffer.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + * NOTE: Concerning the tombstone count. One possible approach + * would be to track the number of tombstones below and above the + * low water mark--this would be straightforward to do. Then, if we + * *require* that the head only advance up to the LWM, we can get a + * correct view on the number of tombstones in the active buffer at + * any point in time, and the BufferView will have a pretty good + * approximation as well (potentially with a few extra if new inserts + * happen between when the tail pointer and tombstone count are fetched) + * + */ +#pragma once + +#include <cstdlib> +#include <atomic> +#include <cassert> +#include <immintrin.h> + +#include "psu-util/alignment.h" +#include "util/bf_config.h" +#include "psu-ds/BloomFilter.h" +#include "framework/interface/Record.h" +#include "framework/structure/BufferView.h" + +using psudb::CACHELINE_SIZE; + +namespace de { + +template <RecordInterface R> +class MutableBuffer { + friend class BufferView<R>; + + struct buffer_head { + size_t head_idx; + size_t refcnt; + }; + +public: + MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0) + : m_lwm(low_watermark) + , m_hwm(high_watermark) + , m_cap((capacity == 0) ? 2 * high_watermark : capacity) + , m_tail(0) + , m_head({0, 0}) + , m_old_head({high_watermark, 0}) + , m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>))) + , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS)) + , m_tscnt(0) + , m_old_tscnt(0) + , m_active_head_advance(false) + { + assert(m_cap > m_hwm); + assert(m_hwm > m_lwm); + } + + ~MutableBuffer() { + free(m_data); + delete m_tombstone_filter; + } + + int append(const R &rec, bool tombstone=false) { + int32_t tail = 0; + if ((tail = try_advance_tail()) == -1) { + return 0; + } + + Wrapped<R> wrec; + wrec.rec = rec; + wrec.header = 0; + if (tombstone) wrec.set_tombstone(); + + size_t pos = tail % m_cap; + + m_data[pos] = wrec; + m_data[pos].header |= (pos << 2); + + if (tombstone) { + m_tscnt.fetch_add(1); + if (m_tombstone_filter) m_tombstone_filter->insert(rec); + } + + return 1; + } + + bool truncate() { + m_tscnt.store(0); + m_tail.store(0); + if (m_tombstone_filter) m_tombstone_filter->clear(); + + return true; + } + + size_t get_record_count() { + return m_tail.load() - m_head.load().head_idx; + } + + size_t get_capacity() { + return m_cap; + } + + bool is_full() { + return get_record_count() >= m_hwm; + } + + bool is_at_low_watermark() { + return get_record_count() >= m_lwm; + } + + size_t get_tombstone_count() { + return m_tscnt.load(); + } + + bool delete_record(const R& rec) { + return get_buffer_view().delete_record(rec); + } + + bool check_tombstone(const R& rec) { + return get_buffer_view().check_tombstone(rec); + } + + size_t get_memory_usage() { + return m_cap * sizeof(Wrapped<R>); + } + + size_t get_aux_memory_usage() { + return m_tombstone_filter->get_memory_usage(); + } + + BufferView<R> get_buffer_view(size_t target_head) { + size_t head = get_head(target_head); + auto f = std::bind(release_head_reference, (void *) this, head); + + return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); + } + + BufferView<R> get_buffer_view() { + size_t head = get_head(m_head.load().head_idx); + auto f = std::bind(release_head_reference, (void *) this, head); + + return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); + } + + /* + * Advance the buffer following a reconstruction. Move current + * head and head_refcnt into old_head and old_head_refcnt, then + * assign new_head to old_head. + */ + bool advance_head(size_t new_head) { + assert(new_head > m_head.load().head_idx); + assert(new_head <= m_tail.load()); + + /* refuse to advance head while there is an old with one references */ + if (m_old_head.load().refcnt > 0) { + //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n"); + return false; + } + + m_active_head_advance.store(true); + + buffer_head new_hd = {new_head, 0}; + buffer_head cur_hd; + + /* replace current head with new head */ + do { + cur_hd = m_head.load(); + } while(!m_head.compare_exchange_strong(cur_hd, new_hd)); + + /* move the current head into the old head */ + m_old_head.store(cur_hd); + + m_active_head_advance.store(false); + return true; + } + + /* + * FIXME: If target_head does not match *either* the old_head or the + * current_head, this routine will loop infinitely. + */ + size_t get_head(size_t target_head) { + buffer_head cur_hd, new_hd; + bool head_acquired = false; + + do { + if (m_old_head.load().head_idx == target_head) { + cur_hd = m_old_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else if (m_head.load().head_idx == target_head){ + cur_hd = m_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } while(!head_acquired); + + return new_hd.head_idx; + } + + void set_low_watermark(size_t lwm) { + assert(lwm < m_hwm); + m_lwm = lwm; + } + + size_t get_low_watermark() { + return m_lwm; + } + + void set_high_watermark(size_t hwm) { + assert(hwm > m_lwm); + assert(hwm < m_cap); + m_hwm = hwm; + } + + size_t get_high_watermark() { + return m_hwm; + } + + size_t get_tail() { + return m_tail.load(); + } + + /* + * Note: this returns the available physical storage capacity, + * *not* now many more records can be inserted before the + * HWM is reached. It considers the old_head to be "free" + * when it has no remaining references. This should be true, + * but a buggy framework implementation may violate the + * assumption. + */ + size_t get_available_capacity() { + if (m_old_head.load().refcnt == 0) { + return m_cap - (m_tail.load() - m_head.load().head_idx); + } + + return m_cap - (m_tail.load() - m_old_head.load().head_idx); + } + +private: + int64_t try_advance_tail() { + size_t old_value = m_tail.load(); + + /* if full, fail to advance the tail */ + if (old_value - m_head.load().head_idx >= m_hwm) { + return -1; + } + + while (!m_tail.compare_exchange_strong(old_value, old_value+1)) { + /* if full, stop trying and fail to advance the tail */ + if (m_tail.load() >= m_hwm) { + return -1; + } + + _mm_pause(); + } + + return old_value; + } + + size_t to_idx(size_t i, size_t head) { + return (head + i) % m_cap; + } + + static void release_head_reference(void *buff, size_t head) { + MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff; + + buffer_head cur_hd, new_hd; + do { + if (buffer->m_old_head.load().head_idx == head) { + cur_hd = buffer->m_old_head; + if (cur_hd.refcnt == 0) continue; + new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; + if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { + break; + } + } else { + cur_hd = buffer->m_head; + if (cur_hd.refcnt == 0) continue; + new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; + + if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { + break; + } + } + _mm_pause(); + } while(true); + } + + size_t m_lwm; + size_t m_hwm; + size_t m_cap; + + alignas(64) std::atomic<size_t> m_tail; + + alignas(64) std::atomic<buffer_head> m_head; + alignas(64) std::atomic<buffer_head> m_old_head; + + Wrapped<R>* m_data; + psudb::BloomFilter<R>* m_tombstone_filter; + alignas(64) std::atomic<size_t> m_tscnt; + size_t m_old_tscnt; + + alignas(64) std::atomic<bool> m_active_head_advance; +}; + +} diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h new file mode 100644 index 0000000..65ca181 --- /dev/null +++ b/include/framework/util/Configuration.h @@ -0,0 +1,49 @@ +/* + * include/framework/util/Configuration.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once + +#include <cstdlib> +#include <utility> + +namespace de { + +static thread_local size_t sampling_attempts = 0; +static thread_local size_t sampling_rejections = 0; +static thread_local size_t deletion_rejections = 0; +static thread_local size_t bounds_rejections = 0; +static thread_local size_t tombstone_rejections = 0; +static thread_local size_t buffer_rejections = 0; + +/* + * thread_local size_t various_sampling_times go here. + */ +static thread_local size_t sample_range_time = 0; +static thread_local size_t alias_time = 0; +static thread_local size_t alias_query_time = 0; +static thread_local size_t rejection_check_time = 0; +static thread_local size_t buffer_sample_time = 0; +static thread_local size_t memlevel_sample_time = 0; +static thread_local size_t disklevel_sample_time = 0; +static thread_local size_t sampling_bailouts = 0; + + +enum class LayoutPolicy { + LEVELING, + TEIRING +}; + +enum class DeletePolicy { + TOMBSTONE, + TAGGING +}; + +typedef ssize_t level_index; +typedef std::pair<level_index, level_index> ReconstructionTask; + +} |