diff options
Diffstat (limited to 'include/framework/structure')
| -rw-r--r-- | include/framework/structure/BufferView.h | 258 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 1080 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 439 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 464 |
4 files changed, 1124 insertions, 1117 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index e95a799..acf1201 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -1,7 +1,7 @@ /* * include/framework/structure/BufferView.h * - * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> * * Distributed under the Modified BSD License. * @@ -9,166 +9,150 @@ */ #pragma once -#include <cstdlib> #include <cassert> +#include <cstdlib> #include <functional> #include <utility> -#include "psu-util/alignment.h" -#include "psu-ds/BloomFilter.h" #include "framework/interface/Record.h" +#include "psu-ds/BloomFilter.h" +#include "psu-util/alignment.h" namespace de { -typedef std::function<void(void)> ReleaseFunction; +typedef std::function<void(void)> ReleaseFunction; -template <RecordInterface R> -class BufferView { +template <RecordInterface R> class BufferView { public: - BufferView() = default; - - /* - * the BufferView's lifetime is tightly linked to buffer versioning, and so - * copying and assignment are disabled. - */ - BufferView(const BufferView&) = delete; - BufferView &operator=(BufferView &) = delete; - - BufferView(BufferView &&other) - : m_data(std::exchange(other.m_data, nullptr)) - , m_release(std::move(other.m_release)) - , m_head(std::exchange(other.m_head, 0)) - , m_tail(std::exchange(other.m_tail, 0)) - , m_start(std::exchange(other.m_start, 0)) - , m_stop(std::exchange(other.m_stop, 0)) - , m_cap(std::exchange(other.m_cap, 0)) - , m_approx_ts_cnt(std::exchange(other.m_approx_ts_cnt, 0)) - , m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr)) - , m_active(std::exchange(other.m_active, false)) {} - - BufferView &operator=(BufferView &&other) = delete; - - - BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail, size_t tombstone_cnt, psudb::BloomFilter<R> *filter, - ReleaseFunction release) - : m_data(buffer) - , m_release(release) - , m_head(head) - , m_tail(tail) - , m_start(m_head % cap) - , m_stop(m_tail % cap) - , m_cap(cap) - , m_approx_ts_cnt(tombstone_cnt) - , m_tombstone_filter(filter) - , m_active(true) {} - - ~BufferView() { - if (m_active) { - m_release(); - } + BufferView() = default; + + /* + * the BufferView's lifetime is tightly linked to buffer versioning, so + * copying and assignment are disabled. + */ + BufferView(const BufferView &) = delete; + BufferView &operator=(BufferView &) = delete; + + BufferView(BufferView &&other) + : m_data(std::exchange(other.m_data, nullptr)), + m_release(std::move(other.m_release)), + m_head(std::exchange(other.m_head, 0)), + m_tail(std::exchange(other.m_tail, 0)), + m_start(std::exchange(other.m_start, 0)), + m_stop(std::exchange(other.m_stop, 0)), + m_cap(std::exchange(other.m_cap, 0)), + m_approx_ts_cnt(std::exchange(other.m_approx_ts_cnt, 0)), + m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr)), + m_active(std::exchange(other.m_active, false)) {} + + BufferView &operator=(BufferView &&other) = delete; + + BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail, + size_t tombstone_cnt, psudb::BloomFilter<R> *filter, + ReleaseFunction release) + : m_data(buffer), m_release(release), m_head(head), m_tail(tail), + m_start(m_head % cap), m_stop(m_tail % cap), m_cap(cap), + m_approx_ts_cnt(tombstone_cnt), m_tombstone_filter(filter), + m_active(true) {} + + ~BufferView() { + if (m_active) { + m_release(); } + } - bool check_tombstone(const R& rec) { - if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; - - for (size_t i=0; i<get_record_count(); i++) { - if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) { - return true; - } - } + bool check_tombstone(const R &rec) { + if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) + return false; - return false; + for (size_t i = 0; i < get_record_count(); i++) { + if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) { + return true; + } } - bool delete_record(const R& rec) { - if (m_start < m_stop) { - for (size_t i=m_start; i<m_stop; i++) { - if (m_data[i].rec == rec) { - m_data[i].set_delete(); - return true; - } - } - } else { - for (size_t i=m_start; i<m_cap; i++) { - if (m_data[i].rec == rec) { - m_data[i].set_delete(); - return true; - } - } - - for (size_t i=0; i<m_stop; i++) { - if (m_data[i].rec == rec) { - m_data[i].set_delete(); - return true; - } - - } + return false; + } + bool delete_record(const R &rec) { + if (m_start < m_stop) { + for (size_t i = m_start; i < m_stop; i++) { + if (m_data[i].rec == rec) { + m_data[i].set_delete(); + return true; } + } + } else { + for (size_t i = m_start; i < m_cap; i++) { + if (m_data[i].rec == rec) { + m_data[i].set_delete(); + return true; + } + } - return false; - } - - size_t get_record_count() { - return m_tail - m_head; - } - - size_t get_capacity() { - return m_cap; - } - - /* - * NOTE: This function returns an upper bound on the number - * of tombstones within the view. There may be less than - * this, due to synchronization issues during view creation. - */ - size_t get_tombstone_count() { - return m_approx_ts_cnt; - } - - Wrapped<R> *get(size_t i) { - //assert(i < get_record_count()); - return m_data + to_idx(i); - } - - void copy_to_buffer(psudb::byte *buffer) { - /* check if the region to be copied circles back to start. If so, do it in two steps */ - if (m_start > m_stop) { - size_t split_idx = m_cap - m_start; - - memcpy(buffer, (std::byte*) (m_data + m_start), split_idx* sizeof(Wrapped<R>)); - memcpy(buffer + (split_idx * sizeof(Wrapped<R>)), (std::byte*) m_data, m_stop * sizeof(Wrapped<R>)); - } else { - memcpy(buffer, (std::byte*) (m_data + m_start), get_record_count() * sizeof(Wrapped<R>)); + for (size_t i = 0; i < m_stop; i++) { + if (m_data[i].rec == rec) { + m_data[i].set_delete(); + return true; } + } } - size_t get_tail() { - return m_tail; + return false; + } + + size_t get_record_count() { return m_tail - m_head; } + + size_t get_capacity() { return m_cap; } + + /* + * NOTE: This function returns an upper bound on the number + * of tombstones within the view. There may be less than + * this, due to synchronization issues during view creation. + */ + size_t get_tombstone_count() { return m_approx_ts_cnt; } + + Wrapped<R> *get(size_t i) { + return m_data + to_idx(i); + } + + void copy_to_buffer(psudb::byte *buffer) { + /* check if the region to be copied circles back to start. If so, do it in + * two steps */ + if (m_start > m_stop) { + size_t split_idx = m_cap - m_start; + + memcpy(buffer, (std::byte *)(m_data + m_start), + split_idx * sizeof(Wrapped<R>)); + memcpy(buffer + (split_idx * sizeof(Wrapped<R>)), (std::byte *)m_data, + m_stop * sizeof(Wrapped<R>)); + } else { + memcpy(buffer, (std::byte *)(m_data + m_start), + get_record_count() * sizeof(Wrapped<R>)); } + } - size_t get_head() { - return m_head; - } + size_t get_tail() { return m_tail; } + + size_t get_head() { return m_head; } private: - Wrapped<R>* m_data; - ReleaseFunction m_release; - size_t m_head; - size_t m_tail; - size_t m_start; - size_t m_stop; - size_t m_cap; - size_t m_approx_ts_cnt; - psudb::BloomFilter<R> *m_tombstone_filter; - bool m_active; - - size_t to_idx(size_t i) { - size_t idx = (m_start + i >= m_cap) ? i - (m_cap - m_start) - : m_start + i; - assert(idx < m_cap); - return idx; - } + Wrapped<R> *m_data; + ReleaseFunction m_release; + size_t m_head; + size_t m_tail; + size_t m_start; + size_t m_stop; + size_t m_cap; + size_t m_approx_ts_cnt; + psudb::BloomFilter<R> *m_tombstone_filter; + bool m_active; + + size_t to_idx(size_t i) { + size_t idx = (m_start + i >= m_cap) ? i - (m_cap - m_start) : m_start + i; + assert(idx < m_cap); + return idx; + } }; -} +} // namespace de diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index b83674b..2728246 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -1,8 +1,8 @@ /* * include/framework/structure/ExtensionStructure.h * - * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> - * Dong Xie <dongx@psu.edu> + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> * * Distributed under the Modified BSD License. * @@ -22,622 +22,660 @@ namespace de { -template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L=LayoutPolicy::TEIRING> +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType, + LayoutPolicy L = LayoutPolicy::TEIRING> class ExtensionStructure { - typedef S Shard; - typedef BufferView<R> BuffView; + typedef typename ShardType::RECORD RecordType; + typedef BufferView<RecordType> BuffView; - typedef struct { - size_t reccnt; - size_t reccap; + typedef struct { + size_t reccnt; + size_t reccap; - size_t shardcnt; - size_t shardcap; - } level_state; + size_t shardcnt; + size_t shardcap; + } level_state; - typedef std::vector<level_state> state_vector; + typedef std::vector<level_state> state_vector; public: - ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) - : m_scale_factor(scale_factor) - , m_max_delete_prop(max_delete_prop) - , m_buffer_size(buffer_size) - {} - - ~ExtensionStructure() = default; - - /* - * Create a shallow copy of this extension structure. The copy will share - * references to the same levels/shards as the original, but will have its - * own lists. As all of the shards are immutable (with the exception of - * deletes), the copy can be restructured with reconstructions and flushes - * without affecting the original. The copied structure will be returned - * with a reference count of 0; generally you will want to immediately call - * take_reference() on it. - * - * NOTE: When using tagged deletes, a delete of a record in the original - * structure will affect the copy, so long as the copy retains a reference - * to the same shard as the original. This could cause synchronization - * problems under tagging with concurrency. Any deletes in this context will - * need to be forwarded to the appropriate structures manually. - */ - ExtensionStructure<R, S, Q, L> *copy() { - auto new_struct = new ExtensionStructure<R, S, Q, L>(m_buffer_size, m_scale_factor, - m_max_delete_prop); - for (size_t i=0; i<m_levels.size(); i++) { - new_struct->m_levels.push_back(m_levels[i]->clone()); - } - - new_struct->m_refcnt = 0; - new_struct->m_current_state = m_current_state; + ExtensionStructure(size_t buffer_size, size_t scale_factor, + double max_delete_prop) + : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop), + m_buffer_size(buffer_size) {} + + ~ExtensionStructure() = default; + + /* + * Create a shallow copy of this extension structure. The copy will share + * references to the same levels/shards as the original, but will have its + * own lists. As all of the shards are immutable (with the exception of + * deletes), the copy can be restructured with reconstructions and flushes + * without affecting the original. The copied structure will be returned + * with a reference count of 0; generally you will want to immediately call + * take_reference() on it. + * + * NOTE: When using tagged deletes, a delete of a record in the original + * structure will affect the copy, so long as the copy retains a reference + * to the same shard as the original. This could cause synchronization + * problems under tagging with concurrency. Any deletes in this context will + * need to be forwarded to the appropriate structures manually. + */ + ExtensionStructure<ShardType, QueryType, L> *copy() { + auto new_struct = new ExtensionStructure<ShardType, QueryType, L>( + m_buffer_size, m_scale_factor, m_max_delete_prop); + for (size_t i = 0; i < m_levels.size(); i++) { + new_struct->m_levels.push_back(m_levels[i]->clone()); + } - return new_struct; + new_struct->m_refcnt = 0; + new_struct->m_current_state = m_current_state; + + return new_struct; + } + + /* + * Search for a record matching the argument and mark it deleted by + * setting the delete bit in its wrapped header. Returns 1 if a matching + * record was found and deleted, and 0 if a matching record was not found. + * + * This function will stop after finding the first matching record. It is + * assumed that no duplicate records exist. In the case of duplicates, this + * function will still "work", but in the sense of "delete first match". + */ + int tagged_delete(const RecordType &rec) { + for (auto level : m_levels) { + if (level && level->delete_record(rec)) { + return 1; + } } /* - * Search for a record matching the argument and mark it deleted by - * setting the delete bit in its wrapped header. Returns 1 if a matching - * record was found and deleted, and 0 if a matching record was not found. - * - * This function will stop after finding the first matching record. It is - * assumed that no duplicate records exist. In the case of duplicates, this - * function will still "work", but in the sense of "delete first match". + * If the record to be erased wasn't found, return 0. The + * DynamicExtension itself will then search the active + * Buffers. */ - int tagged_delete(const R &rec) { - for (auto level : m_levels) { - if (level && level->delete_record(rec)) { - return 1; - } - } - - /* - * If the record to be erased wasn't found, return 0. The - * DynamicExtension itself will then search the active - * Buffers. - */ - return 0; + return 0; + } + + /* + * Flush a buffer into the extension structure, performing any necessary + * reconstructions to free up room in L0. + * + * FIXME: arguably, this should be a method attached to the buffer that + * takes a structure as input. + */ + inline bool flush_buffer(BuffView buffer) { + state_vector tmp = m_current_state; + + if (tmp.size() == 0) { + grow(tmp); } - /* - * Flush a buffer into the extension structure, performing any necessary - * reconstructions to free up room in L0. - * - * FIXME: arguably, this should be a method attached to the buffer that - * takes a structure as input. - */ - inline bool flush_buffer(BuffView buffer) { - state_vector tmp = m_current_state; + assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); + flush_buffer_into_l0(std::move(buffer)); - if (tmp.size() == 0) { - grow(tmp); - } + return true; + } - assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); - flush_buffer_into_l0(std::move(buffer)); + /* + * Return the total number of records (including tombstones) within all + * of the levels of the structure. + */ + size_t get_record_count() { + size_t cnt = 0; - return true; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) + cnt += m_levels[i]->get_record_count(); } - /* - * Return the total number of records (including tombstones) within all - * of the levels of the structure. - */ - size_t get_record_count() { - size_t cnt = 0; + return cnt; + } - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) cnt += m_levels[i]->get_record_count(); - } + /* + * Return the total number of tombstones contained within all of the + * levels of the structure. + */ + size_t get_tombstone_count() { + size_t cnt = 0; - return cnt; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) + cnt += m_levels[i]->get_tombstone_count(); } - /* - * Return the total number of tombstones contained within all of the - * levels of the structure. - */ - size_t get_tombstone_count() { - size_t cnt = 0; - - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count(); - } - - return cnt; + return cnt; + } + + /* + * Return the number of levels within the structure. Note that not + * all of these levels are necessarily populated. + */ + size_t get_height() { return m_levels.size(); } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing the primary data structure and raw data. + */ + size_t get_memory_usage() { + size_t cnt = 0; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) + cnt += m_levels[i]->get_memory_usage(); } - /* - * Return the number of levels within the structure. Note that not - * all of these levels are necessarily populated. - */ - size_t get_height() { - return m_levels.size(); + return cnt; + } + + /* + * Return the amount of memory (in bytes) used by the shards within the + * structure for storing auxiliary data structures. This total does not + * include memory used for the main data structure, or raw data. + */ + size_t get_aux_memory_usage() { + size_t cnt = 0; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + cnt += m_levels[i]->get_aux_memory_usage(); + } } - /* - * Return the amount of memory (in bytes) used by the shards within the - * structure for storing the primary data structure and raw data. - */ - size_t get_memory_usage() { - size_t cnt = 0; - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) cnt += m_levels[i]->get_memory_usage(); + return cnt; + } + + /* + * Validate that no level in the structure exceeds its maximum tombstone + * capacity. This is used to trigger preemptive compactions at the end of + * the reconstruction process. + */ + bool validate_tombstone_proportion() { + long double ts_prop; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + ts_prop = (long double)m_levels[i]->get_tombstone_count() / + (long double)calc_level_record_capacity(i); + if (ts_prop > (long double)m_max_delete_prop) { + return false; } - - return cnt; + } } - /* - * Return the amount of memory (in bytes) used by the shards within the - * structure for storing auxiliary data structures. This total does not - * include memory used for the main data structure, or raw data. + return true; + } + + bool validate_tombstone_proportion(level_index level) { + long double ts_prop = (long double)m_levels[level]->get_tombstone_count() / + (long double)calc_level_record_capacity(level); + return ts_prop <= (long double)m_max_delete_prop; + } + + /* + * Return a reference to the underlying vector of levels within the + * structure. + */ + std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> & + get_levels() { + return m_levels; + } + + /* + * NOTE: This cannot be simulated, because tombstone cancellation is not + * cheaply predictable. It is possible that the worst case number could + * be used instead, to allow for prediction, but compaction isn't a + * major concern outside of sampling; at least for now. So I'm not + * going to focus too much time on it at the moment. + */ + ReconstructionVector get_compaction_tasks() { + ReconstructionVector tasks; + state_vector scratch_state = m_current_state; + + /* if the tombstone/delete invariant is satisfied, no need for compactions */ - size_t get_aux_memory_usage() { - size_t cnt = 0; - for (size_t i=0; i<m_levels.size(); i++) { - if (m_levels[i]) { - cnt += m_levels[i]->get_aux_memory_usage(); - } - } - - return cnt; + if (validate_tombstone_proportion()) { + return tasks; } - /* - * Validate that no level in the structure exceeds its maximum tombstone - * capacity. This is used to trigger preemptive compactions at the end of - * the reconstruction process. - */ - bool validate_tombstone_proportion() { - long double ts_prop; - for (size_t i = 0; i < m_levels.size(); i++) { - if (m_levels[i]) { - ts_prop = (long double)m_levels[i]->get_tombstone_count() / - (long double)calc_level_record_capacity(i); - if (ts_prop > (long double)m_max_delete_prop) { - return false; - } - } + /* locate the first level to violate the invariant */ + level_index violation_idx = -1; + for (level_index i = 0; i < m_levels.size(); i++) { + if (!validate_tombstone_proportion(i)) { + violation_idx = i; + break; } - - return true; } - bool validate_tombstone_proportion(level_index level) { - long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level); - return ts_prop <= (long double) m_max_delete_prop; - } + assert(violation_idx != -1); - /* - * Return a reference to the underlying vector of levels within the - * structure. - */ - std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> &get_levels() { - return m_levels; + level_index base_level = + find_reconstruction_target(violation_idx, scratch_state); + if (base_level == -1) { + base_level = grow(scratch_state); } - /* - * NOTE: This cannot be simulated, because tombstone cancellation is not - * cheaply predictable. It is possible that the worst case number could - * be used instead, to allow for prediction, but compaction isn't a - * major concern outside of sampling; at least for now. So I'm not - * going to focus too much time on it at the moment. - */ - ReconstructionVector get_compaction_tasks() { - ReconstructionVector tasks; - state_vector scratch_state = m_current_state; - - /* if the tombstone/delete invariant is satisfied, no need for compactions */ - if (validate_tombstone_proportion()) { - return tasks; - } - - /* locate the first level to violate the invariant */ - level_index violation_idx = -1; - for (level_index i=0; i<m_levels.size(); i++) { - if (!validate_tombstone_proportion(i)) { - violation_idx = i; - break; - } - } - - assert(violation_idx != -1); - - level_index base_level = find_reconstruction_target(violation_idx, scratch_state); - if (base_level == -1) { - base_level = grow(scratch_state); - } - - for (level_index i=base_level; i>0; i--) { - /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. - */ - size_t reccnt = m_levels[i - 1]->get_record_count(); - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt, scratch_state)) { - reccnt += m_levels[i]->get_record_count(); - } - } - tasks.add_reconstruction(i-i, i, reccnt); + for (level_index i = base_level; i > 0; i--) { + /* + * The amount of storage required for the reconstruction accounts + * for the cost of storing the new records, along with the + * cost of retaining the old records during the process + * (hence the 2x multiplier). + * + * FIXME: currently does not account for the *actual* size + * of the shards, only the storage for the records + * themselves. + */ + size_t reccnt = m_levels[i - 1]->get_record_count(); + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_reconstruct_with(i, reccnt, scratch_state)) { + reccnt += m_levels[i]->get_record_count(); } - - return tasks; + } + tasks.add_reconstruction(i - i, i, reccnt); } + return tasks; + } + + /* + * + */ + ReconstructionVector + get_reconstruction_tasks(size_t buffer_reccnt, + state_vector scratch_state = {}) { /* - * + * If no scratch state vector is provided, use a copy of the + * current one. The only time an empty vector could be used as + * *real* input to this function is when the current state is also + * empty, so this should would even in that case. */ - ReconstructionVector get_reconstruction_tasks(size_t buffer_reccnt, - state_vector scratch_state={}) { - /* - * If no scratch state vector is provided, use a copy of the - * current one. The only time an empty vector could be used as - * *real* input to this function is when the current state is also - * empty, so this should would even in that case. - */ - if (scratch_state.size() == 0) { - scratch_state = m_current_state; - } - - ReconstructionVector reconstructions; - size_t LOOKAHEAD = 1; - for (size_t i=0; i<LOOKAHEAD; i++) { - /* - * If L0 cannot support a direct buffer flush, figure out what - * work must be done to free up space first. Otherwise, the - * reconstruction vector will be initially empty. - */ - if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) { - auto local_recon = get_reconstruction_tasks_from_level(0, scratch_state); - - /* - * for the first iteration, we need to do all of the - * reconstructions, so use these to initially the returned - * reconstruction list - */ - if (i == 0) { - reconstructions = local_recon; - /* - * Quick sanity test of idea: if the next reconstruction - * would be larger than this one, steal the largest - * task from it and run it now instead. - */ - } else if (local_recon.get_total_reccnt() > reconstructions.get_total_reccnt()) { - auto t = local_recon.remove_reconstruction(0); - reconstructions.add_reconstruction(t); - } - } - - /* simulate the buffer flush in the scratch state */ - scratch_state[0].reccnt += buffer_reccnt; - if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) { - scratch_state[0].shardcnt += 1; - } - - } - - return std::move(reconstructions); + if (scratch_state.size() == 0) { + scratch_state = m_current_state; } - - /* - * - */ - ReconstructionVector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) { - ReconstructionVector reconstructions; + ReconstructionVector reconstructions; + size_t LOOKAHEAD = 1; + for (size_t i = 0; i < LOOKAHEAD; i++) { + /* + * If L0 cannot support a direct buffer flush, figure out what + * work must be done to free up space first. Otherwise, the + * reconstruction vector will be initially empty. + */ + if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) { + auto local_recon = + get_reconstruction_tasks_from_level(0, scratch_state); /* - * Find the first level capable of sustaining a reconstruction from - * the level above it. If no such level exists, add a new one at - * the bottom of the structure. + * for the first iteration, we need to do all of the + * reconstructions, so use these to initially the returned + * reconstruction list */ - level_index base_level = find_reconstruction_target(source_level, scratch_state); - if (base_level == -1) { - base_level = grow(scratch_state); + if (i == 0) { + reconstructions = local_recon; + /* + * Quick sanity test of idea: if the next reconstruction + * would be larger than this one, steal the largest + * task from it and run it now instead. + */ + } else if (local_recon.get_total_reccnt() > + reconstructions.get_total_reccnt()) { + auto t = local_recon.remove_reconstruction(0); + reconstructions.add_reconstruction(t); } + } - if constexpr (L == LayoutPolicy::BSM) { - if (base_level == 0) { - return std::move(reconstructions); - } - - ReconstructionTask task; - task.target = base_level; - - size_t base_reccnt = 0; - for (level_index i=base_level; i>source_level; i--) { - auto recon_reccnt = scratch_state[i-1].reccnt; - base_reccnt += recon_reccnt; - scratch_state[i-1].reccnt = 0; - scratch_state[i-1].shardcnt = 0; - task.add_source(i-1, recon_reccnt); - } + /* simulate the buffer flush in the scratch state */ + scratch_state[0].reccnt += buffer_reccnt; + if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) { + scratch_state[0].shardcnt += 1; + } + } - reconstructions.add_reconstruction(task); - scratch_state[base_level].reccnt = base_reccnt; - scratch_state[base_level].shardcnt = 1; + return reconstructions; + } - return std::move(reconstructions); - } + /* + * + */ + ReconstructionVector + get_reconstruction_tasks_from_level(level_index source_level, + state_vector &scratch_state) { + ReconstructionVector reconstructions; - /* - * Determine the full set of reconstructions necessary to open up - * space in the source level. - */ - for (level_index i=base_level; i>source_level; i--) { - size_t recon_reccnt = scratch_state[i-1].reccnt; - size_t base_reccnt = recon_reccnt; - - /* - * If using Leveling, the total reconstruction size will be the - * records in *both* base and target, because they will need to - * be merged (assuming that target isn't empty). - */ - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, base_reccnt, scratch_state)) { - recon_reccnt += scratch_state[i].reccnt; - } - } - reconstructions.add_reconstruction(i-1, i, recon_reccnt); - - /* - * The base level will be emptied and its records moved to - * the target. - */ - scratch_state[i-1].reccnt = 0; - scratch_state[i-1].shardcnt = 0; - - /* - * The target level will have the records from the base level - * added to it, and potentially gain a shard if the LayoutPolicy - * is tiering or the level currently lacks any shards at all. - */ - scratch_state[i].reccnt += base_reccnt; - if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) { - scratch_state[i].shardcnt += 1; - } - } - - return std::move(reconstructions); + /* + * Find the first level capable of sustaining a reconstruction from + * the level above it. If no such level exists, add a new one at + * the bottom of the structure. + */ + level_index base_level = + find_reconstruction_target(source_level, scratch_state); + if (base_level == -1) { + base_level = grow(scratch_state); } - inline void reconstruction(ReconstructionTask task) { - static_assert(L == LayoutPolicy::BSM); - std::vector<InternalLevel<R, Shard, Q>*> levels(task.sources.size()); - for (size_t i=0; i<task.sources.size(); i++) { - levels[i] = m_levels[task.sources[i]].get(); - } + if constexpr (L == LayoutPolicy::BSM) { + if (base_level == 0) { + return reconstructions; + } - auto new_level = InternalLevel<R, Shard, Q>::reconstruction(levels, task.target); - if (task.target >= m_levels.size()) { - m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target), - 1, 1}); - m_levels.emplace_back(new_level); - } else { - m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target), - 1, 1}; - m_levels[task.target] = new_level; - } + ReconstructionTask task; + task.target = base_level; - /* remove all of the levels that have been flattened */ - for (size_t i=0; i<task.sources.size(); i++) { - m_levels[task.sources[i]] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(task.sources[i], 1)); - m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1}; - } + size_t base_reccnt = 0; + for (level_index i = base_level; i > source_level; i--) { + auto recon_reccnt = scratch_state[i - 1].reccnt; + base_reccnt += recon_reccnt; + scratch_state[i - 1].reccnt = 0; + scratch_state[i - 1].shardcnt = 0; + task.add_source(i - 1, recon_reccnt); + } + + reconstructions.add_reconstruction(task); + scratch_state[base_level].reccnt = base_reccnt; + scratch_state[base_level].shardcnt = 1; - return; + return reconstructions; } /* - * Combine incoming_level with base_level and reconstruct the shard, - * placing it in base_level. The two levels should be sequential--i.e. no - * levels are skipped in the reconstruction process--otherwise the - * tombstone ordering invariant may be violated. + * Determine the full set of reconstructions necessary to open up + * space in the source level. */ - inline void reconstruction(level_index base_level, level_index incoming_level) { - size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - - if (base_level >= m_levels.size()) { - m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(base_level, shard_capacity))); - m_current_state.push_back({0, calc_level_record_capacity(base_level), - 0, shard_capacity}); - } - - if constexpr (L == LayoutPolicy::LEVELING) { - /* if the base level has a shard, merge the base and incoming together to make a new one */ - if (m_levels[base_level]->get_shard_count() > 0) { - m_levels[base_level] = InternalLevel<R, Shard, Q>::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get()); - /* otherwise, we can just move the incoming to the base */ - } else { - m_levels[base_level] = m_levels[incoming_level]; - } - - } else { - m_levels[base_level]->append_level(m_levels[incoming_level].get()); - m_levels[base_level]->finalize(); + for (level_index i = base_level; i > source_level; i--) { + size_t recon_reccnt = scratch_state[i - 1].reccnt; + size_t base_reccnt = recon_reccnt; + + /* + * If using Leveling, the total reconstruction size will be the + * records in *both* base and target, because they will need to + * be merged (assuming that target isn't empty). + */ + if constexpr (L == LayoutPolicy::LEVELING) { + if (can_reconstruct_with(i, base_reccnt, scratch_state)) { + recon_reccnt += scratch_state[i].reccnt; } + } + reconstructions.add_reconstruction(i - 1, i, recon_reccnt); + + /* + * The base level will be emptied and its records moved to + * the target. + */ + scratch_state[i - 1].reccnt = 0; + scratch_state[i - 1].shardcnt = 0; + + /* + * The target level will have the records from the base level + * added to it, and potentially gain a shard if the LayoutPolicy + * is tiering or the level currently lacks any shards at all. + */ + scratch_state[i].reccnt += base_reccnt; + if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) { + scratch_state[i].shardcnt += 1; + } + } - /* place a new, empty level where the incoming level used to be */ - m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); + return reconstructions; + } - /* - * Update the state vector to match the *real* state following - * the reconstruction - */ - m_current_state[base_level] = {m_levels[base_level]->get_record_count(), - calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity}; - m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; + inline void reconstruction(ReconstructionTask task) { + static_assert(L == LayoutPolicy::BSM); + std::vector<InternalLevel<ShardType, QueryType> *> levels( + task.sources.size()); + for (size_t i = 0; i < task.sources.size(); i++) { + levels[i] = m_levels[task.sources[i]].get(); } - bool take_reference() { - m_refcnt.fetch_add(1); - return true; + auto new_level = InternalLevel<ShardType, QueryType>::reconstruction( + levels, task.target); + if (task.target >= m_levels.size()) { + m_current_state.push_back({new_level->get_record_count(), + calc_level_record_capacity(task.target), 1, + 1}); + m_levels.emplace_back(new_level); + } else { + m_current_state[task.target] = {new_level->get_record_count(), + calc_level_record_capacity(task.target), + 1, 1}; + m_levels[task.target] = new_level; } - bool release_reference() { - assert(m_refcnt.load() > 0); - m_refcnt.fetch_add(-1); - return true; + /* remove all of the levels that have been flattened */ + for (size_t i = 0; i < task.sources.size(); i++) { + m_levels[task.sources[i]] = + std::shared_ptr<InternalLevel<ShardType, QueryType>>( + new InternalLevel<ShardType, QueryType>(task.sources[i], 1)); + m_current_state[task.sources[i]] = { + 0, calc_level_record_capacity(task.target), 0, 1}; } - size_t get_reference_count() { - return m_refcnt.load(); + return; + } + + /* + * Combine incoming_level with base_level and reconstruct the shard, + * placing it in base_level. The two levels should be sequential--i.e. no + * levels are skipped in the reconstruction process--otherwise the + * tombstone ordering invariant may be violated. + */ + inline void reconstruction(level_index base_level, + level_index incoming_level) { + size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + if (base_level >= m_levels.size()) { + m_levels.emplace_back( + std::shared_ptr<InternalLevel<ShardType, QueryType>>( + new InternalLevel<ShardType, QueryType>(base_level, + shard_capacity))); + m_current_state.push_back( + {0, calc_level_record_capacity(base_level), 0, shard_capacity}); } - std::vector<void *> get_query_states(std::vector<std::pair<ShardID, Shard*>> &shards, void *parms) { - std::vector<void*> states; - - for (auto &level : m_levels) { - level->get_query_states(shards, states, parms); - } + if constexpr (L == LayoutPolicy::LEVELING) { + /* if the base level has a shard, merge the base and incoming together to + * make a new one */ + if (m_levels[base_level]->get_shard_count() > 0) { + m_levels[base_level] = + InternalLevel<ShardType, QueryType>::reconstruction( + m_levels[base_level].get(), m_levels[incoming_level].get()); + /* otherwise, we can just move the incoming to the base */ + } else { + m_levels[base_level] = m_levels[incoming_level]; + } - return states; + } else { + m_levels[base_level]->append_level(m_levels[incoming_level].get()); + m_levels[base_level]->finalize(); } -private: - size_t m_scale_factor; - double m_max_delete_prop; - size_t m_buffer_size; - - std::atomic<size_t> m_refcnt; - - std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels; - - /* - * A pair of <record_count, shard_count> for each level in the - * structure. Record counts may be slightly inaccurate due to - * deletes. - */ - state_vector m_current_state; + /* place a new, empty level where the incoming level used to be */ + m_levels[incoming_level] = + std::shared_ptr<InternalLevel<ShardType, QueryType>>( + new InternalLevel<ShardType, QueryType>( + incoming_level, + (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); /* - * Add a new level to the scratch state and return its index. - * - * IMPORTANT: This does _not_ add a level to the extension structure - * anymore. This is handled by the appropriate reconstruction and flush - * methods as needed. This function is for use in "simulated" - * reconstructions. + * Update the state vector to match the *real* state following + * the reconstruction */ - inline level_index grow(state_vector &scratch_state) { - level_index new_idx = m_levels.size(); - size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - - scratch_state.push_back({0, calc_level_record_capacity(new_idx), - 0, new_shard_cap}); - return new_idx; + m_current_state[base_level] = {m_levels[base_level]->get_record_count(), + calc_level_record_capacity(base_level), + m_levels[base_level]->get_shard_count(), + shard_capacity}; + m_current_state[incoming_level] = { + 0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; + } + + bool take_reference() { + m_refcnt.fetch_add(1); + return true; + } + + bool release_reference() { + assert(m_refcnt.load() > 0); + m_refcnt.fetch_add(-1); + return true; + } + + size_t get_reference_count() { return m_refcnt.load(); } + + std::vector<typename QueryType::LocalQuery *> + get_local_queries(std::vector<std::pair<ShardID, ShardType *>> &shards, + typename QueryType::Parameters *parms) { + + std::vector<typename QueryType::LocalQuery *> queries; + + for (auto &level : m_levels) { + level->get_local_queries(shards, queries, parms); } - /* - * Find the first level below the level indicated by idx that - * is capable of sustaining a reconstruction and return its - * level index. If no such level exists, returns -1. Also - * returns -1 if idx==0, and no such level exists, to simplify - * the logic of the first buffer flush. - */ - inline level_index find_reconstruction_target(level_index idx, state_vector &state) { + return queries; + } - /* - * this handles the very first buffer flush, when the state vector - * is empty. - */ - if (idx == 0 && state.size() == 0) return -1; +private: + size_t m_scale_factor; + double m_max_delete_prop; + size_t m_buffer_size; + + std::atomic<size_t> m_refcnt; + + std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> m_levels; + + /* + * A pair of <record_count, shard_count> for each level in the + * structure. Record counts may be slightly inaccurate due to + * deletes. + */ + state_vector m_current_state; + + /* + * Add a new level to the scratch state and return its index. + * + * IMPORTANT: This does _not_ add a level to the extension structure + * anymore. This is handled by the appropriate reconstruction and flush + * methods as needed. This function is for use in "simulated" + * reconstructions. + */ + inline level_index grow(state_vector &scratch_state) { + level_index new_idx = m_levels.size(); + size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; + + scratch_state.push_back( + {0, calc_level_record_capacity(new_idx), 0, new_shard_cap}); + return new_idx; + } + + /* + * Find the first level below the level indicated by idx that + * is capable of sustaining a reconstruction and return its + * level index. If no such level exists, returns -1. Also + * returns -1 if idx==0, and no such level exists, to simplify + * the logic of the first buffer flush. + */ + inline level_index find_reconstruction_target(level_index idx, + state_vector &state) { - size_t incoming_rec_cnt = state[idx].reccnt; - for (level_index i=idx+1; i<state.size(); i++) { - if (can_reconstruct_with(i, incoming_rec_cnt, state)) { - return i; - } + /* + * this handles the very first buffer flush, when the state vector + * is empty. + */ + if (idx == 0 && state.size() == 0) + return -1; - incoming_rec_cnt = state[idx].reccnt; - } + size_t incoming_rec_cnt = state[idx].reccnt; + for (level_index i = idx + 1; i < state.size(); i++) { + if (can_reconstruct_with(i, incoming_rec_cnt, state)) { + return i; + } - return -1; + incoming_rec_cnt = state[idx].reccnt; } - inline void flush_buffer_into_l0(BuffView buffer) { - size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - - if (m_levels.size() == 0) { - m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(0, shard_capacity))); + return -1; + } - m_current_state.push_back({0, calc_level_record_capacity(0), - 0, shard_capacity}); - } + inline void flush_buffer_into_l0(BuffView buffer) { + size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - if constexpr (L == LayoutPolicy::LEVELING) { - // FIXME: Kludgey implementation due to interface constraints. - auto old_level = m_levels[0].get(); - auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); - temp_level->append_buffer(std::move(buffer)); - - if (old_level->get_shard_count() > 0) { - m_levels[0] = InternalLevel<R, Shard, Q>::reconstruction(old_level, temp_level); - delete temp_level; - } else { - m_levels[0] = std::shared_ptr<InternalLevel<R, Shard, Q>>(temp_level); - } - } else { - m_levels[0]->append_buffer(std::move(buffer)); - } + if (m_levels.size() == 0) { + m_levels.emplace_back( + std::shared_ptr<InternalLevel<ShardType, QueryType>>( + new InternalLevel<ShardType, QueryType>(0, shard_capacity))); - /* update the state vector */ - m_current_state[0].reccnt = m_levels[0]->get_record_count(); - m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); + m_current_state.push_back( + {0, calc_level_record_capacity(0), 0, shard_capacity}); } - /* - * Mark a given memory level as no-longer in use by the tree. For now this - * will just free the level. In future, this will be more complex as the - * level may not be able to immediately be deleted, depending upon who - * else is using it. - */ - inline void mark_as_unused(std::shared_ptr<InternalLevel<R, Shard, Q>> level) { - level.reset(); + if constexpr (L == LayoutPolicy::LEVELING) { + // FIXME: Kludgey implementation due to interface constraints. + auto old_level = m_levels[0].get(); + auto temp_level = new InternalLevel<ShardType, QueryType>(0, 1); + temp_level->append_buffer(std::move(buffer)); + + if (old_level->get_shard_count() > 0) { + m_levels[0] = InternalLevel<ShardType, QueryType>::reconstruction( + old_level, temp_level); + delete temp_level; + } else { + m_levels[0] = + std::shared_ptr<InternalLevel<ShardType, QueryType>>(temp_level); + } + } else { + m_levels[0]->append_buffer(std::move(buffer)); } - /* - * Assume that level "0" should be larger than the buffer. The buffer - * itself is index -1, which should return simply the buffer capacity. - */ - inline size_t calc_level_record_capacity(level_index idx) { - return m_buffer_size * pow(m_scale_factor, idx+1); + /* update the state vector */ + m_current_state[0].reccnt = m_levels[0]->get_record_count(); + m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); + } + + /* + * Mark a given memory level as no-longer in use by the tree. For now this + * will just free the level. In future, this will be more complex as the + * level may not be able to immediately be deleted, depending upon who + * else is using it. + */ + inline void + mark_as_unused(std::shared_ptr<InternalLevel<ShardType, QueryType>> level) { + level.reset(); + } + + /* + * Assume that level "0" should be larger than the buffer. The buffer + * itself is index -1, which should return simply the buffer capacity. + */ + inline size_t calc_level_record_capacity(level_index idx) { + return m_buffer_size * pow(m_scale_factor, idx + 1); + } + + /* + * Returns the number of records present on a specified level. + */ + inline size_t get_level_record_count(level_index idx) { + return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; + } + + /* + * Determines if a level can sustain a reconstruction with incoming_rec_cnt + * additional records without exceeding its capacity. + */ + inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, + state_vector &state) { + if (idx >= state.size()) { + return false; } - /* - * Returns the number of records present on a specified level. - */ - inline size_t get_level_record_count(level_index idx) { - return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; + if constexpr (L == LayoutPolicy::LEVELING) { + return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; + } else if constexpr (L == LayoutPolicy::BSM) { + return state[idx].reccnt == 0; + } else { + return state[idx].shardcnt < state[idx].shardcap; } - /* - * Determines if a level can sustain a reconstruction with incoming_rec_cnt - * additional records without exceeding its capacity. - */ - inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) { - if (idx >= state.size()) { - return false; - } - - if constexpr (L == LayoutPolicy::LEVELING) { - return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; - } else if constexpr (L == LayoutPolicy::BSM) { - return state[idx].reccnt == 0; - } else { - return state[idx].shardcnt < state[idx].shardcap; - } - - /* unreachable */ - assert(true); - } + /* unreachable */ + assert(true); + } }; -} - +} // namespace de diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index b962dcc..a4cf94d 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -1,8 +1,8 @@ /* * include/framework/structure/InternalLevel.h * - * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> - * Dong Xie <dongx@psu.edu> + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> * * Distributed under the Modified BSD License. * @@ -15,276 +15,281 @@ */ #pragma once -#include <vector> #include <memory> +#include <vector> -#include "util/types.h" -#include "framework/interface/Shard.h" #include "framework/interface/Query.h" #include "framework/interface/Record.h" +#include "framework/interface/Shard.h" #include "framework/structure/BufferView.h" +#include "util/types.h" namespace de { -template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q> +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> class InternalLevel; - - -template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q> +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> class InternalLevel { - typedef S Shard; - typedef BufferView<R> BuffView; -public: - InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no) - , m_shard_cnt(0) - , m_shards(shard_cap, nullptr) - , m_pending_shard(nullptr) - {} - - ~InternalLevel() { - delete m_pending_shard; - } + typedef typename ShardType::RECORD RecordType; + typedef BufferView<RecordType> BuffView; - /* - * Create a new shard combining the records from base_level and new_level, - * and return a shared_ptr to a new level containing this shard. This is used - * for reconstructions under the leveling layout policy. - * - * No changes are made to the levels provided as arguments. - */ - static std::shared_ptr<InternalLevel> reconstruction(InternalLevel* base_level, InternalLevel* new_level) { - assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); - auto res = new InternalLevel(base_level->m_level_no, 1); - res->m_shard_cnt = 1; - std::vector<Shard *> shards = {base_level->m_shards[0].get(), +public: + InternalLevel(ssize_t level_no, size_t shard_cap) + : m_level_no(level_no), m_shard_cnt(0), m_shards(shard_cap, nullptr), + m_pending_shard(nullptr) {} + + ~InternalLevel() { delete m_pending_shard; } + + /* + * Create a new shard combining the records from base_level and new_level, + * and return a shared_ptr to a new level containing this shard. This is used + * for reconstructions under the leveling layout policy. + * + * No changes are made to the levels provided as arguments. + */ + static std::shared_ptr<InternalLevel> + reconstruction(InternalLevel *base_level, InternalLevel *new_level) { + assert(base_level->m_level_no > new_level->m_level_no || + (base_level->m_level_no == 0 && new_level->m_level_no == 0)); + auto res = new InternalLevel(base_level->m_level_no, 1); + res->m_shard_cnt = 1; + std::vector<ShardType *> shards = {base_level->m_shards[0].get(), new_level->m_shards[0].get()}; - res->m_shards[0] = std::make_shared<S>(shards); - return std::shared_ptr<InternalLevel>(res); + res->m_shards[0] = std::make_shared<ShardType>(shards); + return std::shared_ptr<InternalLevel>(res); + } + + static std::shared_ptr<InternalLevel> + reconstruction(std::vector<InternalLevel *> levels, size_t level_idx) { + std::vector<ShardType *> shards; + for (auto level : levels) { + for (auto shard : level->m_shards) { + if (shard) + shards.emplace_back(shard.get()); + } } - static std::shared_ptr<InternalLevel> reconstruction(std::vector<InternalLevel*> levels, size_t level_idx) { - std::vector<Shard *> shards; - for (auto level : levels) { - for (auto shard : level->m_shards) { - if (shard) shards.emplace_back(shard.get()); - } - } - - auto res = new InternalLevel(level_idx, 1); - res->m_shard_cnt = 1; - res->m_shards[0] = std::make_shared<S>(shards); + auto res = new InternalLevel(level_idx, 1); + res->m_shard_cnt = 1; + res->m_shards[0] = std::make_shared<ShardType>(shards); + + return std::shared_ptr<InternalLevel>(res); + } + + /* + * Create a new shard combining the records from all of + * the shards in level, and append this new shard into + * this level. This is used for reconstructions under + * the tiering layout policy. + * + * No changes are made to the level provided as an argument. + */ + void append_level(InternalLevel *level) { + // FIXME: that this is happening probably means that + // something is going terribly wrong earlier in the + // reconstruction logic. + if (level->get_shard_count() == 0) { + return; + } - return std::shared_ptr<InternalLevel>(res); + std::vector<ShardType *> shards; + for (auto shard : level->m_shards) { + if (shard) + shards.emplace_back(shard.get()); } - /* - * Create a new shard combining the records from all of - * the shards in level, and append this new shard into - * this level. This is used for reconstructions under - * the tiering layout policy. - * - * No changes are made to the level provided as an argument. - */ - void append_level(InternalLevel* level) { - // FIXME: that this is happening probably means that - // something is going terribly wrong earlier in the - // reconstruction logic. - if (level->get_shard_count() == 0) { - return; - } + if (m_shard_cnt == m_shards.size()) { + m_pending_shard = new ShardType(shards); + return; + } - std::vector<S*> shards; - for (auto shard : level->m_shards) { - if (shard) shards.emplace_back(shard.get()); - } + auto tmp = new ShardType(shards); + m_shards[m_shard_cnt] = std::shared_ptr<ShardType>(tmp); + + ++m_shard_cnt; + } + + /* + * Create a new shard using the records in the + * provided buffer, and append this new shard + * into this level. This is used for buffer + * flushes under the tiering layout policy. + */ + void append_buffer(BuffView buffer) { + if (m_shard_cnt == m_shards.size()) { + assert(m_pending_shard == nullptr); + m_pending_shard = new ShardType(std::move(buffer)); + return; + } - if (m_shard_cnt == m_shards.size()) { - m_pending_shard = new S(shards); - return; - } + m_shards[m_shard_cnt] = std::make_shared<ShardType>(std::move(buffer)); + ++m_shard_cnt; + } - auto tmp = new S(shards); - m_shards[m_shard_cnt] = std::shared_ptr<S>(tmp); + void finalize() { + if (m_pending_shard) { + for (size_t i = 0; i < m_shards.size(); i++) { + m_shards[i] = nullptr; + } - ++m_shard_cnt; + m_shards[0] = std::shared_ptr<ShardType>(m_pending_shard); + m_pending_shard = nullptr; + m_shard_cnt = 1; } - - /* - * Create a new shard using the records in the - * provided buffer, and append this new shard - * into this level. This is used for buffer - * flushes under the tiering layout policy. - */ - void append_buffer(BuffView buffer) { - if (m_shard_cnt == m_shards.size()) { - assert(m_pending_shard == nullptr); - m_pending_shard = new S(std::move(buffer)); - return; - } - - m_shards[m_shard_cnt] = std::make_shared<S>(std::move(buffer)); - ++m_shard_cnt; + } + + /* + * Create a new shard containing the combined records + * from all shards on this level and return it. + * + * No changes are made to this level. + */ + ShardType *get_combined_shard() { + if (m_shard_cnt == 0) { + return nullptr; } - void finalize() { - if (m_pending_shard) { - for (size_t i=0; i<m_shards.size(); i++) { - m_shards[i] = nullptr; - } - - m_shards[0] = std::shared_ptr<S>(m_pending_shard); - m_pending_shard = nullptr; - m_shard_cnt = 1; - } + std::vector<ShardType *> shards; + for (auto shard : m_shards) { + if (shard) + shards.emplace_back(shard.get()); } - /* - * Create a new shard containing the combined records - * from all shards on this level and return it. - * - * No changes are made to this level. - */ - Shard *get_combined_shard() { - if (m_shard_cnt == 0) { - return nullptr; - } - - std::vector<Shard *> shards; - for (auto shard : m_shards) { - if (shard) shards.emplace_back(shard.get()); - } - - return new S(shards); + return new ShardType(shards); + } + + void get_local_queries( + std::vector<std::pair<ShardID, ShardType *>> &shards, + std::vector<typename QueryType::LocalQuery *> &local_queries, + typename QueryType::Parameters *query_parms) { + for (size_t i = 0; i < m_shard_cnt; i++) { + if (m_shards[i]) { + auto local_query = + QueryType::local_preproc(m_shards[i].get(), query_parms); + shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].get()}); + local_queries.emplace_back(local_query); + } } + } - void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) { - for (size_t i=0; i<m_shard_cnt; i++) { - if (m_shards[i]) { - auto shard_state = Q::get_query_state(m_shards[i].get(), query_parms); - shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i].get()}); - shard_states.emplace_back(shard_state); - } + bool check_tombstone(size_t shard_stop, const RecordType &rec) { + if (m_shard_cnt == 0) + return false; + + for (int i = m_shard_cnt - 1; i >= (ssize_t)shard_stop; i--) { + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec, true); + if (res && res->is_tombstone()) { + return true; } + } } - - bool check_tombstone(size_t shard_stop, const R& rec) { - if (m_shard_cnt == 0) return false; - - for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec, true); - if (res && res->is_tombstone()) { - return true; - } - } + return false; + } + + bool delete_record(const RecordType &rec) { + if (m_shard_cnt == 0) + return false; + + for (size_t i = 0; i < m_shards.size(); ++i) { + if (m_shards[i]) { + auto res = m_shards[i]->point_lookup(rec); + if (res) { + res->set_delete(); + return true; } - return false; + } } - bool delete_record(const R &rec) { - if (m_shard_cnt == 0) return false; - - for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec); - if (res) { - res->set_delete(); - return true; - } - } - } + return false; + } - return false; + ShardType *get_shard(size_t idx) { + if (idx >= m_shard_cnt) { + return nullptr; } - Shard* get_shard(size_t idx) { - if (idx >= m_shard_cnt) { - return nullptr; - } + return m_shards[idx].get(); + } - return m_shards[idx].get(); - } + size_t get_shard_count() { return m_shard_cnt; } - size_t get_shard_count() { - return m_shard_cnt; + size_t get_record_count() { + size_t cnt = 0; + for (size_t i = 0; i < m_shard_cnt; i++) { + if (m_shards[i]) { + cnt += m_shards[i]->get_record_count(); + } } - size_t get_record_count() { - size_t cnt = 0; - for (size_t i=0; i<m_shard_cnt; i++) { - if (m_shards[i]) { - cnt += m_shards[i]->get_record_count(); - } - } + return cnt; + } - return cnt; + size_t get_tombstone_count() { + size_t res = 0; + for (size_t i = 0; i < m_shard_cnt; ++i) { + if (m_shards[i]) { + res += m_shards[i]->get_tombstone_count(); + } } - - size_t get_tombstone_count() { - size_t res = 0; - for (size_t i = 0; i < m_shard_cnt; ++i) { - if (m_shards[i]) { - res += m_shards[i]->get_tombstone_count(); - } - } - return res; + return res; + } + + size_t get_aux_memory_usage() { + size_t cnt = 0; + for (size_t i = 0; i < m_shard_cnt; i++) { + if (m_shards[i]) { + cnt += m_shards[i]->get_aux_memory_usage(); + } } - size_t get_aux_memory_usage() { - size_t cnt = 0; - for (size_t i=0; i<m_shard_cnt; i++) { - if (m_shards[i]){ - cnt += m_shards[i]->get_aux_memory_usage(); - } - } + return cnt; + } - return cnt; + size_t get_memory_usage() { + size_t cnt = 0; + for (size_t i = 0; i < m_shard_cnt; i++) { + if (m_shards[i]) { + cnt += m_shards[i]->get_memory_usage(); + } } - size_t get_memory_usage() { - size_t cnt = 0; - for (size_t i=0; i<m_shard_cnt; i++) { - if (m_shards[i]) { - cnt += m_shards[i]->get_memory_usage(); - } - } - - return cnt; + return cnt; + } + + double get_tombstone_prop() { + size_t tscnt = 0; + size_t reccnt = 0; + for (size_t i = 0; i < m_shard_cnt; i++) { + if (m_shards[i]) { + tscnt += m_shards[i]->get_tombstone_count(); + reccnt += m_shards[i]->get_record_count(); + } } - double get_tombstone_prop() { - size_t tscnt = 0; - size_t reccnt = 0; - for (size_t i=0; i<m_shard_cnt; i++) { - if (m_shards[i]) { - tscnt += m_shards[i]->get_tombstone_count(); - reccnt += m_shards[i]->get_record_count(); - } - } + return (double)tscnt / (double)(tscnt + reccnt); + } - return (double) tscnt / (double) (tscnt + reccnt); + std::shared_ptr<InternalLevel> clone() { + auto new_level = + std::make_shared<InternalLevel>(m_level_no, m_shards.size()); + for (size_t i = 0; i < m_shard_cnt; i++) { + new_level->m_shards[i] = m_shards[i]; } + new_level->m_shard_cnt = m_shard_cnt; - std::shared_ptr<InternalLevel> clone() { - auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size()); - for (size_t i=0; i<m_shard_cnt; i++) { - new_level->m_shards[i] = m_shards[i]; - } - new_level->m_shard_cnt = m_shard_cnt; - - return new_level; - } + return new_level; + } private: - ssize_t m_level_no; - - size_t m_shard_cnt; - size_t m_shard_size_cap; + ssize_t m_level_no; + + size_t m_shard_cnt; + size_t m_shard_size_cap; - std::vector<std::shared_ptr<Shard>> m_shards; - Shard *m_pending_shard; + std::vector<std::shared_ptr<ShardType>> m_shards; + ShardType *m_pending_shard; }; -} +} // namespace de diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 7db3980..625b04b 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -1,8 +1,8 @@ /* * include/framework/structure/MutableBuffer.h * - * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> - * Dong Xie <dongx@psu.edu> + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> * * Distributed under the Modified BSD License. * @@ -18,301 +18,281 @@ */ #pragma once -#include <cstdlib> #include <atomic> #include <cassert> +#include <cstdlib> #include <immintrin.h> -#include "psu-util/alignment.h" -#include "util/bf_config.h" -#include "psu-ds/BloomFilter.h" #include "framework/interface/Record.h" #include "framework/structure/BufferView.h" - -using psudb::CACHELINE_SIZE; +#include "psu-ds/BloomFilter.h" +#include "psu-util/alignment.h" +#include "util/bf_config.h" namespace de { -template <RecordInterface R> -class MutableBuffer { - friend class BufferView<R>; +template <RecordInterface R> class MutableBuffer { + friend class BufferView<R>; - struct buffer_head { - size_t head_idx; - size_t refcnt; - }; - -public: - MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0) - : m_lwm(low_watermark) - , m_hwm(high_watermark) - , m_cap((capacity == 0) ? 2 * high_watermark : capacity) - , m_tail(0) - , m_head({0, 0}) - , m_old_head({high_watermark, 0}) - //, m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>))) - , m_data(new Wrapped<R>[m_cap]()) - , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS)) - , m_tscnt(0) - , m_old_tscnt(0) - , m_active_head_advance(false) - { - assert(m_cap > m_hwm); - assert(m_hwm >= m_lwm); - } + struct buffer_head { + size_t head_idx; + size_t refcnt; + }; - ~MutableBuffer() { - delete[] m_data; - delete m_tombstone_filter; +public: + MutableBuffer(size_t low_watermark, size_t high_watermark, + size_t capacity = 0) + : m_lwm(low_watermark), m_hwm(high_watermark), + m_cap((capacity == 0) ? 2 * high_watermark : capacity), m_tail(0), + m_head({0, 0}), m_old_head({high_watermark, 0}), + m_data(new Wrapped<R>[m_cap]()), + m_tombstone_filter( + new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS)), + m_tscnt(0), m_old_tscnt(0), m_active_head_advance(false) { + assert(m_cap > m_hwm); + assert(m_hwm >= m_lwm); + } + + ~MutableBuffer() { + delete[] m_data; + delete m_tombstone_filter; + } + + int append(const R &rec, bool tombstone = false) { + int32_t tail = 0; + if ((tail = try_advance_tail()) == -1) { + return 0; } - int append(const R &rec, bool tombstone=false) { - int32_t tail = 0; - if ((tail = try_advance_tail()) == -1) { - return 0; - } - - Wrapped<R> wrec; - wrec.rec = rec; - wrec.header = 0; - if (tombstone) wrec.set_tombstone(); + Wrapped<R> wrec; + wrec.rec = rec; + wrec.header = 0; + if (tombstone) + wrec.set_tombstone(); - // FIXME: because of the mod, it isn't correct to use `pos` - // as the ordering timestamp in the header anymore. - size_t pos = tail % m_cap; - - m_data[pos] = wrec; - m_data[pos].set_timestamp(pos); - - if (tombstone) { - m_tscnt.fetch_add(1); - if (m_tombstone_filter) m_tombstone_filter->insert(rec); - } + // FIXME: because of the mod, it isn't correct to use `pos` + // as the ordering timestamp in the header anymore. + size_t pos = tail % m_cap; - m_data[pos].set_visible(); + m_data[pos] = wrec; + m_data[pos].set_timestamp(pos); - return 1; + if (tombstone) { + m_tscnt.fetch_add(1); + if (m_tombstone_filter) + m_tombstone_filter->insert(rec); } - bool truncate() { - m_tscnt.store(0); - m_tail.store(0); - if (m_tombstone_filter) m_tombstone_filter->clear(); + m_data[pos].set_visible(); - return true; - } + return 1; + } - size_t get_record_count() { - return m_tail.load() - m_head.load().head_idx; - } - - size_t get_capacity() { - return m_cap; - } + bool truncate() { + m_tscnt.store(0); + m_tail.store(0); + if (m_tombstone_filter) + m_tombstone_filter->clear(); - bool is_full() { - return get_record_count() >= m_hwm; - } + return true; + } - bool is_at_low_watermark() { - return get_record_count() >= m_lwm; - } + size_t get_record_count() { return m_tail.load() - m_head.load().head_idx; } - size_t get_tombstone_count() { - return m_tscnt.load(); - } + size_t get_capacity() { return m_cap; } - bool delete_record(const R& rec) { - return get_buffer_view().delete_record(rec); - } + bool is_full() { return get_record_count() >= m_hwm; } - bool check_tombstone(const R& rec) { - return get_buffer_view().check_tombstone(rec); - } + bool is_at_low_watermark() { return get_record_count() >= m_lwm; } - size_t get_memory_usage() { - return m_cap * sizeof(Wrapped<R>); - } + size_t get_tombstone_count() { return m_tscnt.load(); } - size_t get_aux_memory_usage() { - return m_tombstone_filter->get_memory_usage(); - } + bool delete_record(const R &rec) { + return get_buffer_view().delete_record(rec); + } - BufferView<R> get_buffer_view(size_t target_head) { - size_t head = get_head(target_head); - auto f = std::bind(release_head_reference, (void *) this, head); + bool check_tombstone(const R &rec) { + return get_buffer_view().check_tombstone(rec); + } - return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); - } + size_t get_memory_usage() { return m_cap * sizeof(Wrapped<R>); } - BufferView<R> get_buffer_view() { - size_t head = get_head(m_head.load().head_idx); - auto f = std::bind(release_head_reference, (void *) this, head); + size_t get_aux_memory_usage() { + return m_tombstone_filter->get_memory_usage(); + } - return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); - } + BufferView<R> get_buffer_view(size_t target_head) { + size_t head = get_head(target_head); + auto f = std::bind(release_head_reference, (void *)this, head); - /* - * Advance the buffer following a reconstruction. Move current - * head and head_refcnt into old_head and old_head_refcnt, then - * assign new_head to old_head. - */ - bool advance_head(size_t new_head) { - assert(new_head > m_head.load().head_idx); - assert(new_head <= m_tail.load()); - - /* refuse to advance head while there is an old with one references */ - if (m_old_head.load().refcnt > 0) { - //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n"); - return false; - } - - m_active_head_advance.store(true); + return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), + m_tombstone_filter, f); + } - buffer_head new_hd = {new_head, 0}; - buffer_head cur_hd; + BufferView<R> get_buffer_view() { + size_t head = get_head(m_head.load().head_idx); + auto f = std::bind(release_head_reference, (void *)this, head); - /* replace current head with new head */ - do { - cur_hd = m_head.load(); - } while(!m_head.compare_exchange_strong(cur_hd, new_hd)); + return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), + m_tombstone_filter, f); + } - /* move the current head into the old head */ - m_old_head.store(cur_hd); + /* + * Advance the buffer following a reconstruction. Move current + * head and head_refcnt into old_head and old_head_refcnt, then + * assign new_head to old_head. + */ + bool advance_head(size_t new_head) { + assert(new_head > m_head.load().head_idx); + assert(new_head <= m_tail.load()); - m_active_head_advance.store(false); - return true; + /* refuse to advance head while there is an old with one references */ + if (m_old_head.load().refcnt > 0) { + // fprintf(stderr, "[W]: Refusing to advance head due to remaining + // reference counts\n"); + return false; } - /* - * FIXME: If target_head does not match *either* the old_head or the - * current_head, this routine will loop infinitely. - */ - size_t get_head(size_t target_head) { - buffer_head cur_hd, new_hd; - bool head_acquired = false; - - do { - if (m_old_head.load().head_idx == target_head) { - cur_hd = m_old_head.load(); - cur_hd.head_idx = target_head; - new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; - head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); - } else if (m_head.load().head_idx == target_head){ - cur_hd = m_head.load(); - cur_hd.head_idx = target_head; - new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; - head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); - } - } while(!head_acquired); - - return new_hd.head_idx; + m_active_head_advance.store(true); + + buffer_head new_hd = {new_head, 0}; + buffer_head cur_hd; + + /* replace current head with new head */ + do { + cur_hd = m_head.load(); + } while (!m_head.compare_exchange_strong(cur_hd, new_hd)); + + /* move the current head into the old head */ + m_old_head.store(cur_hd); + + m_active_head_advance.store(false); + return true; + } + + /* + * FIXME: If target_head does not match *either* the old_head or the + * current_head, this routine will loop infinitely. + */ + size_t get_head(size_t target_head) { + buffer_head cur_hd, new_hd; + bool head_acquired = false; + + do { + if (m_old_head.load().head_idx == target_head) { + cur_hd = m_old_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else if (m_head.load().head_idx == target_head) { + cur_hd = m_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } while (!head_acquired); + + return new_hd.head_idx; + } + + void set_low_watermark(size_t lwm) { + assert(lwm < m_hwm); + m_lwm = lwm; + } + + size_t get_low_watermark() { return m_lwm; } + + void set_high_watermark(size_t hwm) { + assert(hwm > m_lwm); + assert(hwm < m_cap); + m_hwm = hwm; + } + + size_t get_high_watermark() { return m_hwm; } + + size_t get_tail() { return m_tail.load(); } + + /* + * Note: this returns the available physical storage capacity, + * *not* now many more records can be inserted before the + * HWM is reached. It considers the old_head to be "free" + * when it has no remaining references. This should be true, + * but a buggy framework implementation may violate the + * assumption. + */ + size_t get_available_capacity() { + if (m_old_head.load().refcnt == 0) { + return m_cap - (m_tail.load() - m_head.load().head_idx); } - void set_low_watermark(size_t lwm) { - assert(lwm < m_hwm); - m_lwm = lwm; - } + return m_cap - (m_tail.load() - m_old_head.load().head_idx); + } - size_t get_low_watermark() { - return m_lwm; - } +private: + int64_t try_advance_tail() { + size_t old_value = m_tail.load(); - void set_high_watermark(size_t hwm) { - assert(hwm > m_lwm); - assert(hwm < m_cap); - m_hwm = hwm; + /* if full, fail to advance the tail */ + if (old_value - m_head.load().head_idx >= m_hwm) { + return -1; } - size_t get_high_watermark() { - return m_hwm; - } + while (!m_tail.compare_exchange_strong(old_value, old_value + 1)) { + /* if full, stop trying and fail to advance the tail */ + if (m_tail.load() >= m_hwm) { + return -1; + } - size_t get_tail() { - return m_tail.load(); + _mm_pause(); } - /* - * Note: this returns the available physical storage capacity, - * *not* now many more records can be inserted before the - * HWM is reached. It considers the old_head to be "free" - * when it has no remaining references. This should be true, - * but a buggy framework implementation may violate the - * assumption. - */ - size_t get_available_capacity() { - if (m_old_head.load().refcnt == 0) { - return m_cap - (m_tail.load() - m_head.load().head_idx); - } + return old_value; + } - return m_cap - (m_tail.load() - m_old_head.load().head_idx); - } + size_t to_idx(size_t i, size_t head) { return (head + i) % m_cap; } -private: - int64_t try_advance_tail() { - size_t old_value = m_tail.load(); + static void release_head_reference(void *buff, size_t head) { + MutableBuffer<R> *buffer = (MutableBuffer<R> *)buff; - /* if full, fail to advance the tail */ - if (old_value - m_head.load().head_idx >= m_hwm) { - return -1; + buffer_head cur_hd, new_hd; + do { + if (buffer->m_old_head.load().head_idx == head) { + cur_hd = buffer->m_old_head; + if (cur_hd.refcnt == 0) + continue; + new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; + if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { + break; } - - while (!m_tail.compare_exchange_strong(old_value, old_value+1)) { - /* if full, stop trying and fail to advance the tail */ - if (m_tail.load() >= m_hwm) { - return -1; - } - - _mm_pause(); + } else { + cur_hd = buffer->m_head; + if (cur_hd.refcnt == 0) + continue; + new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1}; + + if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { + break; } + } + _mm_pause(); + } while (true); + } - return old_value; - } + size_t m_lwm; + size_t m_hwm; + size_t m_cap; - size_t to_idx(size_t i, size_t head) { - return (head + i) % m_cap; - } + alignas(64) std::atomic<size_t> m_tail; - static void release_head_reference(void *buff, size_t head) { - MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff; - - buffer_head cur_hd, new_hd; - do { - if (buffer->m_old_head.load().head_idx == head) { - cur_hd = buffer->m_old_head; - if (cur_hd.refcnt == 0) continue; - new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; - if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { - break; - } - } else { - cur_hd = buffer->m_head; - if (cur_hd.refcnt == 0) continue; - new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; - - if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { - break; - } - } - _mm_pause(); - } while(true); - } + alignas(64) std::atomic<buffer_head> m_head; + alignas(64) std::atomic<buffer_head> m_old_head; + + Wrapped<R> *m_data; + psudb::BloomFilter<R> *m_tombstone_filter; + alignas(64) std::atomic<size_t> m_tscnt; + size_t m_old_tscnt; - size_t m_lwm; - size_t m_hwm; - size_t m_cap; - - alignas(64) std::atomic<size_t> m_tail; - - alignas(64) std::atomic<buffer_head> m_head; - alignas(64) std::atomic<buffer_head> m_old_head; - - Wrapped<R>* m_data; - psudb::BloomFilter<R>* m_tombstone_filter; - alignas(64) std::atomic<size_t> m_tscnt; - size_t m_old_tscnt; - - alignas(64) std::atomic<bool> m_active_head_advance; + alignas(64) std::atomic<bool> m_active_head_advance; }; -} +} // namespace de |