summaryrefslogtreecommitdiffstats
path: root/include/framework/structure
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/structure')
-rw-r--r--include/framework/structure/BufferView.h258
-rw-r--r--include/framework/structure/ExtensionStructure.h1080
-rw-r--r--include/framework/structure/InternalLevel.h439
-rw-r--r--include/framework/structure/MutableBuffer.h464
4 files changed, 1124 insertions, 1117 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index e95a799..acf1201 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -1,7 +1,7 @@
/*
* include/framework/structure/BufferView.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -9,166 +9,150 @@
*/
#pragma once
-#include <cstdlib>
#include <cassert>
+#include <cstdlib>
#include <functional>
#include <utility>
-#include "psu-util/alignment.h"
-#include "psu-ds/BloomFilter.h"
#include "framework/interface/Record.h"
+#include "psu-ds/BloomFilter.h"
+#include "psu-util/alignment.h"
namespace de {
-typedef std::function<void(void)> ReleaseFunction;
+typedef std::function<void(void)> ReleaseFunction;
-template <RecordInterface R>
-class BufferView {
+template <RecordInterface R> class BufferView {
public:
- BufferView() = default;
-
- /*
- * the BufferView's lifetime is tightly linked to buffer versioning, and so
- * copying and assignment are disabled.
- */
- BufferView(const BufferView&) = delete;
- BufferView &operator=(BufferView &) = delete;
-
- BufferView(BufferView &&other)
- : m_data(std::exchange(other.m_data, nullptr))
- , m_release(std::move(other.m_release))
- , m_head(std::exchange(other.m_head, 0))
- , m_tail(std::exchange(other.m_tail, 0))
- , m_start(std::exchange(other.m_start, 0))
- , m_stop(std::exchange(other.m_stop, 0))
- , m_cap(std::exchange(other.m_cap, 0))
- , m_approx_ts_cnt(std::exchange(other.m_approx_ts_cnt, 0))
- , m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr))
- , m_active(std::exchange(other.m_active, false)) {}
-
- BufferView &operator=(BufferView &&other) = delete;
-
-
- BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail, size_t tombstone_cnt, psudb::BloomFilter<R> *filter,
- ReleaseFunction release)
- : m_data(buffer)
- , m_release(release)
- , m_head(head)
- , m_tail(tail)
- , m_start(m_head % cap)
- , m_stop(m_tail % cap)
- , m_cap(cap)
- , m_approx_ts_cnt(tombstone_cnt)
- , m_tombstone_filter(filter)
- , m_active(true) {}
-
- ~BufferView() {
- if (m_active) {
- m_release();
- }
+ BufferView() = default;
+
+ /*
+ * the BufferView's lifetime is tightly linked to buffer versioning, so
+ * copying and assignment are disabled.
+ */
+ BufferView(const BufferView &) = delete;
+ BufferView &operator=(BufferView &) = delete;
+
+ BufferView(BufferView &&other)
+ : m_data(std::exchange(other.m_data, nullptr)),
+ m_release(std::move(other.m_release)),
+ m_head(std::exchange(other.m_head, 0)),
+ m_tail(std::exchange(other.m_tail, 0)),
+ m_start(std::exchange(other.m_start, 0)),
+ m_stop(std::exchange(other.m_stop, 0)),
+ m_cap(std::exchange(other.m_cap, 0)),
+ m_approx_ts_cnt(std::exchange(other.m_approx_ts_cnt, 0)),
+ m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr)),
+ m_active(std::exchange(other.m_active, false)) {}
+
+ BufferView &operator=(BufferView &&other) = delete;
+
+ BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail,
+ size_t tombstone_cnt, psudb::BloomFilter<R> *filter,
+ ReleaseFunction release)
+ : m_data(buffer), m_release(release), m_head(head), m_tail(tail),
+ m_start(m_head % cap), m_stop(m_tail % cap), m_cap(cap),
+ m_approx_ts_cnt(tombstone_cnt), m_tombstone_filter(filter),
+ m_active(true) {}
+
+ ~BufferView() {
+ if (m_active) {
+ m_release();
}
+ }
- bool check_tombstone(const R& rec) {
- if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false;
-
- for (size_t i=0; i<get_record_count(); i++) {
- if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) {
- return true;
- }
- }
+ bool check_tombstone(const R &rec) {
+ if (m_tombstone_filter && !m_tombstone_filter->lookup(rec))
+ return false;
- return false;
+ for (size_t i = 0; i < get_record_count(); i++) {
+ if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) {
+ return true;
+ }
}
- bool delete_record(const R& rec) {
- if (m_start < m_stop) {
- for (size_t i=m_start; i<m_stop; i++) {
- if (m_data[i].rec == rec) {
- m_data[i].set_delete();
- return true;
- }
- }
- } else {
- for (size_t i=m_start; i<m_cap; i++) {
- if (m_data[i].rec == rec) {
- m_data[i].set_delete();
- return true;
- }
- }
-
- for (size_t i=0; i<m_stop; i++) {
- if (m_data[i].rec == rec) {
- m_data[i].set_delete();
- return true;
- }
-
- }
+ return false;
+ }
+ bool delete_record(const R &rec) {
+ if (m_start < m_stop) {
+ for (size_t i = m_start; i < m_stop; i++) {
+ if (m_data[i].rec == rec) {
+ m_data[i].set_delete();
+ return true;
}
+ }
+ } else {
+ for (size_t i = m_start; i < m_cap; i++) {
+ if (m_data[i].rec == rec) {
+ m_data[i].set_delete();
+ return true;
+ }
+ }
- return false;
- }
-
- size_t get_record_count() {
- return m_tail - m_head;
- }
-
- size_t get_capacity() {
- return m_cap;
- }
-
- /*
- * NOTE: This function returns an upper bound on the number
- * of tombstones within the view. There may be less than
- * this, due to synchronization issues during view creation.
- */
- size_t get_tombstone_count() {
- return m_approx_ts_cnt;
- }
-
- Wrapped<R> *get(size_t i) {
- //assert(i < get_record_count());
- return m_data + to_idx(i);
- }
-
- void copy_to_buffer(psudb::byte *buffer) {
- /* check if the region to be copied circles back to start. If so, do it in two steps */
- if (m_start > m_stop) {
- size_t split_idx = m_cap - m_start;
-
- memcpy(buffer, (std::byte*) (m_data + m_start), split_idx* sizeof(Wrapped<R>));
- memcpy(buffer + (split_idx * sizeof(Wrapped<R>)), (std::byte*) m_data, m_stop * sizeof(Wrapped<R>));
- } else {
- memcpy(buffer, (std::byte*) (m_data + m_start), get_record_count() * sizeof(Wrapped<R>));
+ for (size_t i = 0; i < m_stop; i++) {
+ if (m_data[i].rec == rec) {
+ m_data[i].set_delete();
+ return true;
}
+ }
}
- size_t get_tail() {
- return m_tail;
+ return false;
+ }
+
+ size_t get_record_count() { return m_tail - m_head; }
+
+ size_t get_capacity() { return m_cap; }
+
+ /*
+ * NOTE: This function returns an upper bound on the number
+ * of tombstones within the view. There may be less than
+ * this, due to synchronization issues during view creation.
+ */
+ size_t get_tombstone_count() { return m_approx_ts_cnt; }
+
+ Wrapped<R> *get(size_t i) {
+ return m_data + to_idx(i);
+ }
+
+ void copy_to_buffer(psudb::byte *buffer) {
+ /* check if the region to be copied circles back to start. If so, do it in
+ * two steps */
+ if (m_start > m_stop) {
+ size_t split_idx = m_cap - m_start;
+
+ memcpy(buffer, (std::byte *)(m_data + m_start),
+ split_idx * sizeof(Wrapped<R>));
+ memcpy(buffer + (split_idx * sizeof(Wrapped<R>)), (std::byte *)m_data,
+ m_stop * sizeof(Wrapped<R>));
+ } else {
+ memcpy(buffer, (std::byte *)(m_data + m_start),
+ get_record_count() * sizeof(Wrapped<R>));
}
+ }
- size_t get_head() {
- return m_head;
- }
+ size_t get_tail() { return m_tail; }
+
+ size_t get_head() { return m_head; }
private:
- Wrapped<R>* m_data;
- ReleaseFunction m_release;
- size_t m_head;
- size_t m_tail;
- size_t m_start;
- size_t m_stop;
- size_t m_cap;
- size_t m_approx_ts_cnt;
- psudb::BloomFilter<R> *m_tombstone_filter;
- bool m_active;
-
- size_t to_idx(size_t i) {
- size_t idx = (m_start + i >= m_cap) ? i - (m_cap - m_start)
- : m_start + i;
- assert(idx < m_cap);
- return idx;
- }
+ Wrapped<R> *m_data;
+ ReleaseFunction m_release;
+ size_t m_head;
+ size_t m_tail;
+ size_t m_start;
+ size_t m_stop;
+ size_t m_cap;
+ size_t m_approx_ts_cnt;
+ psudb::BloomFilter<R> *m_tombstone_filter;
+ bool m_active;
+
+ size_t to_idx(size_t i) {
+ size_t idx = (m_start + i >= m_cap) ? i - (m_cap - m_start) : m_start + i;
+ assert(idx < m_cap);
+ return idx;
+ }
};
-}
+} // namespace de
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index b83674b..2728246 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -1,8 +1,8 @@
/*
* include/framework/structure/ExtensionStructure.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -22,622 +22,660 @@
namespace de {
-template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L=LayoutPolicy::TEIRING>
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
+ LayoutPolicy L = LayoutPolicy::TEIRING>
class ExtensionStructure {
- typedef S Shard;
- typedef BufferView<R> BuffView;
+ typedef typename ShardType::RECORD RecordType;
+ typedef BufferView<RecordType> BuffView;
- typedef struct {
- size_t reccnt;
- size_t reccap;
+ typedef struct {
+ size_t reccnt;
+ size_t reccap;
- size_t shardcnt;
- size_t shardcap;
- } level_state;
+ size_t shardcnt;
+ size_t shardcap;
+ } level_state;
- typedef std::vector<level_state> state_vector;
+ typedef std::vector<level_state> state_vector;
public:
- ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop)
- : m_scale_factor(scale_factor)
- , m_max_delete_prop(max_delete_prop)
- , m_buffer_size(buffer_size)
- {}
-
- ~ExtensionStructure() = default;
-
- /*
- * Create a shallow copy of this extension structure. The copy will share
- * references to the same levels/shards as the original, but will have its
- * own lists. As all of the shards are immutable (with the exception of
- * deletes), the copy can be restructured with reconstructions and flushes
- * without affecting the original. The copied structure will be returned
- * with a reference count of 0; generally you will want to immediately call
- * take_reference() on it.
- *
- * NOTE: When using tagged deletes, a delete of a record in the original
- * structure will affect the copy, so long as the copy retains a reference
- * to the same shard as the original. This could cause synchronization
- * problems under tagging with concurrency. Any deletes in this context will
- * need to be forwarded to the appropriate structures manually.
- */
- ExtensionStructure<R, S, Q, L> *copy() {
- auto new_struct = new ExtensionStructure<R, S, Q, L>(m_buffer_size, m_scale_factor,
- m_max_delete_prop);
- for (size_t i=0; i<m_levels.size(); i++) {
- new_struct->m_levels.push_back(m_levels[i]->clone());
- }
-
- new_struct->m_refcnt = 0;
- new_struct->m_current_state = m_current_state;
+ ExtensionStructure(size_t buffer_size, size_t scale_factor,
+ double max_delete_prop)
+ : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop),
+ m_buffer_size(buffer_size) {}
+
+ ~ExtensionStructure() = default;
+
+ /*
+ * Create a shallow copy of this extension structure. The copy will share
+ * references to the same levels/shards as the original, but will have its
+ * own lists. As all of the shards are immutable (with the exception of
+ * deletes), the copy can be restructured with reconstructions and flushes
+ * without affecting the original. The copied structure will be returned
+ * with a reference count of 0; generally you will want to immediately call
+ * take_reference() on it.
+ *
+ * NOTE: When using tagged deletes, a delete of a record in the original
+ * structure will affect the copy, so long as the copy retains a reference
+ * to the same shard as the original. This could cause synchronization
+ * problems under tagging with concurrency. Any deletes in this context will
+ * need to be forwarded to the appropriate structures manually.
+ */
+ ExtensionStructure<ShardType, QueryType, L> *copy() {
+ auto new_struct = new ExtensionStructure<ShardType, QueryType, L>(
+ m_buffer_size, m_scale_factor, m_max_delete_prop);
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ new_struct->m_levels.push_back(m_levels[i]->clone());
+ }
- return new_struct;
+ new_struct->m_refcnt = 0;
+ new_struct->m_current_state = m_current_state;
+
+ return new_struct;
+ }
+
+ /*
+ * Search for a record matching the argument and mark it deleted by
+ * setting the delete bit in its wrapped header. Returns 1 if a matching
+ * record was found and deleted, and 0 if a matching record was not found.
+ *
+ * This function will stop after finding the first matching record. It is
+ * assumed that no duplicate records exist. In the case of duplicates, this
+ * function will still "work", but in the sense of "delete first match".
+ */
+ int tagged_delete(const RecordType &rec) {
+ for (auto level : m_levels) {
+ if (level && level->delete_record(rec)) {
+ return 1;
+ }
}
/*
- * Search for a record matching the argument and mark it deleted by
- * setting the delete bit in its wrapped header. Returns 1 if a matching
- * record was found and deleted, and 0 if a matching record was not found.
- *
- * This function will stop after finding the first matching record. It is
- * assumed that no duplicate records exist. In the case of duplicates, this
- * function will still "work", but in the sense of "delete first match".
+ * If the record to be erased wasn't found, return 0. The
+ * DynamicExtension itself will then search the active
+ * Buffers.
*/
- int tagged_delete(const R &rec) {
- for (auto level : m_levels) {
- if (level && level->delete_record(rec)) {
- return 1;
- }
- }
-
- /*
- * If the record to be erased wasn't found, return 0. The
- * DynamicExtension itself will then search the active
- * Buffers.
- */
- return 0;
+ return 0;
+ }
+
+ /*
+ * Flush a buffer into the extension structure, performing any necessary
+ * reconstructions to free up room in L0.
+ *
+ * FIXME: arguably, this should be a method attached to the buffer that
+ * takes a structure as input.
+ */
+ inline bool flush_buffer(BuffView buffer) {
+ state_vector tmp = m_current_state;
+
+ if (tmp.size() == 0) {
+ grow(tmp);
}
- /*
- * Flush a buffer into the extension structure, performing any necessary
- * reconstructions to free up room in L0.
- *
- * FIXME: arguably, this should be a method attached to the buffer that
- * takes a structure as input.
- */
- inline bool flush_buffer(BuffView buffer) {
- state_vector tmp = m_current_state;
+ assert(can_reconstruct_with(0, buffer.get_record_count(), tmp));
+ flush_buffer_into_l0(std::move(buffer));
- if (tmp.size() == 0) {
- grow(tmp);
- }
+ return true;
+ }
- assert(can_reconstruct_with(0, buffer.get_record_count(), tmp));
- flush_buffer_into_l0(std::move(buffer));
+ /*
+ * Return the total number of records (including tombstones) within all
+ * of the levels of the structure.
+ */
+ size_t get_record_count() {
+ size_t cnt = 0;
- return true;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i])
+ cnt += m_levels[i]->get_record_count();
}
- /*
- * Return the total number of records (including tombstones) within all
- * of the levels of the structure.
- */
- size_t get_record_count() {
- size_t cnt = 0;
+ return cnt;
+ }
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_record_count();
- }
+ /*
+ * Return the total number of tombstones contained within all of the
+ * levels of the structure.
+ */
+ size_t get_tombstone_count() {
+ size_t cnt = 0;
- return cnt;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i])
+ cnt += m_levels[i]->get_tombstone_count();
}
- /*
- * Return the total number of tombstones contained within all of the
- * levels of the structure.
- */
- size_t get_tombstone_count() {
- size_t cnt = 0;
-
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count();
- }
-
- return cnt;
+ return cnt;
+ }
+
+ /*
+ * Return the number of levels within the structure. Note that not
+ * all of these levels are necessarily populated.
+ */
+ size_t get_height() { return m_levels.size(); }
+
+ /*
+ * Return the amount of memory (in bytes) used by the shards within the
+ * structure for storing the primary data structure and raw data.
+ */
+ size_t get_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i])
+ cnt += m_levels[i]->get_memory_usage();
}
- /*
- * Return the number of levels within the structure. Note that not
- * all of these levels are necessarily populated.
- */
- size_t get_height() {
- return m_levels.size();
+ return cnt;
+ }
+
+ /*
+ * Return the amount of memory (in bytes) used by the shards within the
+ * structure for storing auxiliary data structures. This total does not
+ * include memory used for the main data structure, or raw data.
+ */
+ size_t get_aux_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i]) {
+ cnt += m_levels[i]->get_aux_memory_usage();
+ }
}
- /*
- * Return the amount of memory (in bytes) used by the shards within the
- * structure for storing the primary data structure and raw data.
- */
- size_t get_memory_usage() {
- size_t cnt = 0;
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_memory_usage();
+ return cnt;
+ }
+
+ /*
+ * Validate that no level in the structure exceeds its maximum tombstone
+ * capacity. This is used to trigger preemptive compactions at the end of
+ * the reconstruction process.
+ */
+ bool validate_tombstone_proportion() {
+ long double ts_prop;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i]) {
+ ts_prop = (long double)m_levels[i]->get_tombstone_count() /
+ (long double)calc_level_record_capacity(i);
+ if (ts_prop > (long double)m_max_delete_prop) {
+ return false;
}
-
- return cnt;
+ }
}
- /*
- * Return the amount of memory (in bytes) used by the shards within the
- * structure for storing auxiliary data structures. This total does not
- * include memory used for the main data structure, or raw data.
+ return true;
+ }
+
+ bool validate_tombstone_proportion(level_index level) {
+ long double ts_prop = (long double)m_levels[level]->get_tombstone_count() /
+ (long double)calc_level_record_capacity(level);
+ return ts_prop <= (long double)m_max_delete_prop;
+ }
+
+ /*
+ * Return a reference to the underlying vector of levels within the
+ * structure.
+ */
+ std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> &
+ get_levels() {
+ return m_levels;
+ }
+
+ /*
+ * NOTE: This cannot be simulated, because tombstone cancellation is not
+ * cheaply predictable. It is possible that the worst case number could
+ * be used instead, to allow for prediction, but compaction isn't a
+ * major concern outside of sampling; at least for now. So I'm not
+ * going to focus too much time on it at the moment.
+ */
+ ReconstructionVector get_compaction_tasks() {
+ ReconstructionVector tasks;
+ state_vector scratch_state = m_current_state;
+
+ /* if the tombstone/delete invariant is satisfied, no need for compactions
*/
- size_t get_aux_memory_usage() {
- size_t cnt = 0;
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) {
- cnt += m_levels[i]->get_aux_memory_usage();
- }
- }
-
- return cnt;
+ if (validate_tombstone_proportion()) {
+ return tasks;
}
- /*
- * Validate that no level in the structure exceeds its maximum tombstone
- * capacity. This is used to trigger preemptive compactions at the end of
- * the reconstruction process.
- */
- bool validate_tombstone_proportion() {
- long double ts_prop;
- for (size_t i = 0; i < m_levels.size(); i++) {
- if (m_levels[i]) {
- ts_prop = (long double)m_levels[i]->get_tombstone_count() /
- (long double)calc_level_record_capacity(i);
- if (ts_prop > (long double)m_max_delete_prop) {
- return false;
- }
- }
+ /* locate the first level to violate the invariant */
+ level_index violation_idx = -1;
+ for (level_index i = 0; i < m_levels.size(); i++) {
+ if (!validate_tombstone_proportion(i)) {
+ violation_idx = i;
+ break;
}
-
- return true;
}
- bool validate_tombstone_proportion(level_index level) {
- long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level);
- return ts_prop <= (long double) m_max_delete_prop;
- }
+ assert(violation_idx != -1);
- /*
- * Return a reference to the underlying vector of levels within the
- * structure.
- */
- std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> &get_levels() {
- return m_levels;
+ level_index base_level =
+ find_reconstruction_target(violation_idx, scratch_state);
+ if (base_level == -1) {
+ base_level = grow(scratch_state);
}
- /*
- * NOTE: This cannot be simulated, because tombstone cancellation is not
- * cheaply predictable. It is possible that the worst case number could
- * be used instead, to allow for prediction, but compaction isn't a
- * major concern outside of sampling; at least for now. So I'm not
- * going to focus too much time on it at the moment.
- */
- ReconstructionVector get_compaction_tasks() {
- ReconstructionVector tasks;
- state_vector scratch_state = m_current_state;
-
- /* if the tombstone/delete invariant is satisfied, no need for compactions */
- if (validate_tombstone_proportion()) {
- return tasks;
- }
-
- /* locate the first level to violate the invariant */
- level_index violation_idx = -1;
- for (level_index i=0; i<m_levels.size(); i++) {
- if (!validate_tombstone_proportion(i)) {
- violation_idx = i;
- break;
- }
- }
-
- assert(violation_idx != -1);
-
- level_index base_level = find_reconstruction_target(violation_idx, scratch_state);
- if (base_level == -1) {
- base_level = grow(scratch_state);
- }
-
- for (level_index i=base_level; i>0; i--) {
- /*
- * The amount of storage required for the reconstruction accounts
- * for the cost of storing the new records, along with the
- * cost of retaining the old records during the process
- * (hence the 2x multiplier).
- *
- * FIXME: currently does not account for the *actual* size
- * of the shards, only the storage for the records
- * themselves.
- */
- size_t reccnt = m_levels[i - 1]->get_record_count();
- if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, reccnt, scratch_state)) {
- reccnt += m_levels[i]->get_record_count();
- }
- }
- tasks.add_reconstruction(i-i, i, reccnt);
+ for (level_index i = base_level; i > 0; i--) {
+ /*
+ * The amount of storage required for the reconstruction accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i - 1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_reconstruct_with(i, reccnt, scratch_state)) {
+ reccnt += m_levels[i]->get_record_count();
}
-
- return tasks;
+ }
+ tasks.add_reconstruction(i - i, i, reccnt);
}
+ return tasks;
+ }
+
+ /*
+ *
+ */
+ ReconstructionVector
+ get_reconstruction_tasks(size_t buffer_reccnt,
+ state_vector scratch_state = {}) {
/*
- *
+ * If no scratch state vector is provided, use a copy of the
+ * current one. The only time an empty vector could be used as
+ * *real* input to this function is when the current state is also
+ * empty, so this should would even in that case.
*/
- ReconstructionVector get_reconstruction_tasks(size_t buffer_reccnt,
- state_vector scratch_state={}) {
- /*
- * If no scratch state vector is provided, use a copy of the
- * current one. The only time an empty vector could be used as
- * *real* input to this function is when the current state is also
- * empty, so this should would even in that case.
- */
- if (scratch_state.size() == 0) {
- scratch_state = m_current_state;
- }
-
- ReconstructionVector reconstructions;
- size_t LOOKAHEAD = 1;
- for (size_t i=0; i<LOOKAHEAD; i++) {
- /*
- * If L0 cannot support a direct buffer flush, figure out what
- * work must be done to free up space first. Otherwise, the
- * reconstruction vector will be initially empty.
- */
- if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) {
- auto local_recon = get_reconstruction_tasks_from_level(0, scratch_state);
-
- /*
- * for the first iteration, we need to do all of the
- * reconstructions, so use these to initially the returned
- * reconstruction list
- */
- if (i == 0) {
- reconstructions = local_recon;
- /*
- * Quick sanity test of idea: if the next reconstruction
- * would be larger than this one, steal the largest
- * task from it and run it now instead.
- */
- } else if (local_recon.get_total_reccnt() > reconstructions.get_total_reccnt()) {
- auto t = local_recon.remove_reconstruction(0);
- reconstructions.add_reconstruction(t);
- }
- }
-
- /* simulate the buffer flush in the scratch state */
- scratch_state[0].reccnt += buffer_reccnt;
- if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) {
- scratch_state[0].shardcnt += 1;
- }
-
- }
-
- return std::move(reconstructions);
+ if (scratch_state.size() == 0) {
+ scratch_state = m_current_state;
}
-
- /*
- *
- */
- ReconstructionVector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) {
- ReconstructionVector reconstructions;
+ ReconstructionVector reconstructions;
+ size_t LOOKAHEAD = 1;
+ for (size_t i = 0; i < LOOKAHEAD; i++) {
+ /*
+ * If L0 cannot support a direct buffer flush, figure out what
+ * work must be done to free up space first. Otherwise, the
+ * reconstruction vector will be initially empty.
+ */
+ if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) {
+ auto local_recon =
+ get_reconstruction_tasks_from_level(0, scratch_state);
/*
- * Find the first level capable of sustaining a reconstruction from
- * the level above it. If no such level exists, add a new one at
- * the bottom of the structure.
+ * for the first iteration, we need to do all of the
+ * reconstructions, so use these to initially the returned
+ * reconstruction list
*/
- level_index base_level = find_reconstruction_target(source_level, scratch_state);
- if (base_level == -1) {
- base_level = grow(scratch_state);
+ if (i == 0) {
+ reconstructions = local_recon;
+ /*
+ * Quick sanity test of idea: if the next reconstruction
+ * would be larger than this one, steal the largest
+ * task from it and run it now instead.
+ */
+ } else if (local_recon.get_total_reccnt() >
+ reconstructions.get_total_reccnt()) {
+ auto t = local_recon.remove_reconstruction(0);
+ reconstructions.add_reconstruction(t);
}
+ }
- if constexpr (L == LayoutPolicy::BSM) {
- if (base_level == 0) {
- return std::move(reconstructions);
- }
-
- ReconstructionTask task;
- task.target = base_level;
-
- size_t base_reccnt = 0;
- for (level_index i=base_level; i>source_level; i--) {
- auto recon_reccnt = scratch_state[i-1].reccnt;
- base_reccnt += recon_reccnt;
- scratch_state[i-1].reccnt = 0;
- scratch_state[i-1].shardcnt = 0;
- task.add_source(i-1, recon_reccnt);
- }
+ /* simulate the buffer flush in the scratch state */
+ scratch_state[0].reccnt += buffer_reccnt;
+ if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) {
+ scratch_state[0].shardcnt += 1;
+ }
+ }
- reconstructions.add_reconstruction(task);
- scratch_state[base_level].reccnt = base_reccnt;
- scratch_state[base_level].shardcnt = 1;
+ return reconstructions;
+ }
- return std::move(reconstructions);
- }
+ /*
+ *
+ */
+ ReconstructionVector
+ get_reconstruction_tasks_from_level(level_index source_level,
+ state_vector &scratch_state) {
+ ReconstructionVector reconstructions;
- /*
- * Determine the full set of reconstructions necessary to open up
- * space in the source level.
- */
- for (level_index i=base_level; i>source_level; i--) {
- size_t recon_reccnt = scratch_state[i-1].reccnt;
- size_t base_reccnt = recon_reccnt;
-
- /*
- * If using Leveling, the total reconstruction size will be the
- * records in *both* base and target, because they will need to
- * be merged (assuming that target isn't empty).
- */
- if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, base_reccnt, scratch_state)) {
- recon_reccnt += scratch_state[i].reccnt;
- }
- }
- reconstructions.add_reconstruction(i-1, i, recon_reccnt);
-
- /*
- * The base level will be emptied and its records moved to
- * the target.
- */
- scratch_state[i-1].reccnt = 0;
- scratch_state[i-1].shardcnt = 0;
-
- /*
- * The target level will have the records from the base level
- * added to it, and potentially gain a shard if the LayoutPolicy
- * is tiering or the level currently lacks any shards at all.
- */
- scratch_state[i].reccnt += base_reccnt;
- if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) {
- scratch_state[i].shardcnt += 1;
- }
- }
-
- return std::move(reconstructions);
+ /*
+ * Find the first level capable of sustaining a reconstruction from
+ * the level above it. If no such level exists, add a new one at
+ * the bottom of the structure.
+ */
+ level_index base_level =
+ find_reconstruction_target(source_level, scratch_state);
+ if (base_level == -1) {
+ base_level = grow(scratch_state);
}
- inline void reconstruction(ReconstructionTask task) {
- static_assert(L == LayoutPolicy::BSM);
- std::vector<InternalLevel<R, Shard, Q>*> levels(task.sources.size());
- for (size_t i=0; i<task.sources.size(); i++) {
- levels[i] = m_levels[task.sources[i]].get();
- }
+ if constexpr (L == LayoutPolicy::BSM) {
+ if (base_level == 0) {
+ return reconstructions;
+ }
- auto new_level = InternalLevel<R, Shard, Q>::reconstruction(levels, task.target);
- if (task.target >= m_levels.size()) {
- m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target),
- 1, 1});
- m_levels.emplace_back(new_level);
- } else {
- m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target),
- 1, 1};
- m_levels[task.target] = new_level;
- }
+ ReconstructionTask task;
+ task.target = base_level;
- /* remove all of the levels that have been flattened */
- for (size_t i=0; i<task.sources.size(); i++) {
- m_levels[task.sources[i]] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(task.sources[i], 1));
- m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1};
- }
+ size_t base_reccnt = 0;
+ for (level_index i = base_level; i > source_level; i--) {
+ auto recon_reccnt = scratch_state[i - 1].reccnt;
+ base_reccnt += recon_reccnt;
+ scratch_state[i - 1].reccnt = 0;
+ scratch_state[i - 1].shardcnt = 0;
+ task.add_source(i - 1, recon_reccnt);
+ }
+
+ reconstructions.add_reconstruction(task);
+ scratch_state[base_level].reccnt = base_reccnt;
+ scratch_state[base_level].shardcnt = 1;
- return;
+ return reconstructions;
}
/*
- * Combine incoming_level with base_level and reconstruct the shard,
- * placing it in base_level. The two levels should be sequential--i.e. no
- * levels are skipped in the reconstruction process--otherwise the
- * tombstone ordering invariant may be violated.
+ * Determine the full set of reconstructions necessary to open up
+ * space in the source level.
*/
- inline void reconstruction(level_index base_level, level_index incoming_level) {
- size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
-
- if (base_level >= m_levels.size()) {
- m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(base_level, shard_capacity)));
- m_current_state.push_back({0, calc_level_record_capacity(base_level),
- 0, shard_capacity});
- }
-
- if constexpr (L == LayoutPolicy::LEVELING) {
- /* if the base level has a shard, merge the base and incoming together to make a new one */
- if (m_levels[base_level]->get_shard_count() > 0) {
- m_levels[base_level] = InternalLevel<R, Shard, Q>::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get());
- /* otherwise, we can just move the incoming to the base */
- } else {
- m_levels[base_level] = m_levels[incoming_level];
- }
-
- } else {
- m_levels[base_level]->append_level(m_levels[incoming_level].get());
- m_levels[base_level]->finalize();
+ for (level_index i = base_level; i > source_level; i--) {
+ size_t recon_reccnt = scratch_state[i - 1].reccnt;
+ size_t base_reccnt = recon_reccnt;
+
+ /*
+ * If using Leveling, the total reconstruction size will be the
+ * records in *both* base and target, because they will need to
+ * be merged (assuming that target isn't empty).
+ */
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_reconstruct_with(i, base_reccnt, scratch_state)) {
+ recon_reccnt += scratch_state[i].reccnt;
}
+ }
+ reconstructions.add_reconstruction(i - 1, i, recon_reccnt);
+
+ /*
+ * The base level will be emptied and its records moved to
+ * the target.
+ */
+ scratch_state[i - 1].reccnt = 0;
+ scratch_state[i - 1].shardcnt = 0;
+
+ /*
+ * The target level will have the records from the base level
+ * added to it, and potentially gain a shard if the LayoutPolicy
+ * is tiering or the level currently lacks any shards at all.
+ */
+ scratch_state[i].reccnt += base_reccnt;
+ if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) {
+ scratch_state[i].shardcnt += 1;
+ }
+ }
- /* place a new, empty level where the incoming level used to be */
- m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
+ return reconstructions;
+ }
- /*
- * Update the state vector to match the *real* state following
- * the reconstruction
- */
- m_current_state[base_level] = {m_levels[base_level]->get_record_count(),
- calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity};
- m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity};
+ inline void reconstruction(ReconstructionTask task) {
+ static_assert(L == LayoutPolicy::BSM);
+ std::vector<InternalLevel<ShardType, QueryType> *> levels(
+ task.sources.size());
+ for (size_t i = 0; i < task.sources.size(); i++) {
+ levels[i] = m_levels[task.sources[i]].get();
}
- bool take_reference() {
- m_refcnt.fetch_add(1);
- return true;
+ auto new_level = InternalLevel<ShardType, QueryType>::reconstruction(
+ levels, task.target);
+ if (task.target >= m_levels.size()) {
+ m_current_state.push_back({new_level->get_record_count(),
+ calc_level_record_capacity(task.target), 1,
+ 1});
+ m_levels.emplace_back(new_level);
+ } else {
+ m_current_state[task.target] = {new_level->get_record_count(),
+ calc_level_record_capacity(task.target),
+ 1, 1};
+ m_levels[task.target] = new_level;
}
- bool release_reference() {
- assert(m_refcnt.load() > 0);
- m_refcnt.fetch_add(-1);
- return true;
+ /* remove all of the levels that have been flattened */
+ for (size_t i = 0; i < task.sources.size(); i++) {
+ m_levels[task.sources[i]] =
+ std::shared_ptr<InternalLevel<ShardType, QueryType>>(
+ new InternalLevel<ShardType, QueryType>(task.sources[i], 1));
+ m_current_state[task.sources[i]] = {
+ 0, calc_level_record_capacity(task.target), 0, 1};
}
- size_t get_reference_count() {
- return m_refcnt.load();
+ return;
+ }
+
+ /*
+ * Combine incoming_level with base_level and reconstruct the shard,
+ * placing it in base_level. The two levels should be sequential--i.e. no
+ * levels are skipped in the reconstruction process--otherwise the
+ * tombstone ordering invariant may be violated.
+ */
+ inline void reconstruction(level_index base_level,
+ level_index incoming_level) {
+ size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+
+ if (base_level >= m_levels.size()) {
+ m_levels.emplace_back(
+ std::shared_ptr<InternalLevel<ShardType, QueryType>>(
+ new InternalLevel<ShardType, QueryType>(base_level,
+ shard_capacity)));
+ m_current_state.push_back(
+ {0, calc_level_record_capacity(base_level), 0, shard_capacity});
}
- std::vector<void *> get_query_states(std::vector<std::pair<ShardID, Shard*>> &shards, void *parms) {
- std::vector<void*> states;
-
- for (auto &level : m_levels) {
- level->get_query_states(shards, states, parms);
- }
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ /* if the base level has a shard, merge the base and incoming together to
+ * make a new one */
+ if (m_levels[base_level]->get_shard_count() > 0) {
+ m_levels[base_level] =
+ InternalLevel<ShardType, QueryType>::reconstruction(
+ m_levels[base_level].get(), m_levels[incoming_level].get());
+ /* otherwise, we can just move the incoming to the base */
+ } else {
+ m_levels[base_level] = m_levels[incoming_level];
+ }
- return states;
+ } else {
+ m_levels[base_level]->append_level(m_levels[incoming_level].get());
+ m_levels[base_level]->finalize();
}
-private:
- size_t m_scale_factor;
- double m_max_delete_prop;
- size_t m_buffer_size;
-
- std::atomic<size_t> m_refcnt;
-
- std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels;
-
- /*
- * A pair of <record_count, shard_count> for each level in the
- * structure. Record counts may be slightly inaccurate due to
- * deletes.
- */
- state_vector m_current_state;
+ /* place a new, empty level where the incoming level used to be */
+ m_levels[incoming_level] =
+ std::shared_ptr<InternalLevel<ShardType, QueryType>>(
+ new InternalLevel<ShardType, QueryType>(
+ incoming_level,
+ (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
/*
- * Add a new level to the scratch state and return its index.
- *
- * IMPORTANT: This does _not_ add a level to the extension structure
- * anymore. This is handled by the appropriate reconstruction and flush
- * methods as needed. This function is for use in "simulated"
- * reconstructions.
+ * Update the state vector to match the *real* state following
+ * the reconstruction
*/
- inline level_index grow(state_vector &scratch_state) {
- level_index new_idx = m_levels.size();
- size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
-
- scratch_state.push_back({0, calc_level_record_capacity(new_idx),
- 0, new_shard_cap});
- return new_idx;
+ m_current_state[base_level] = {m_levels[base_level]->get_record_count(),
+ calc_level_record_capacity(base_level),
+ m_levels[base_level]->get_shard_count(),
+ shard_capacity};
+ m_current_state[incoming_level] = {
+ 0, calc_level_record_capacity(incoming_level), 0, shard_capacity};
+ }
+
+ bool take_reference() {
+ m_refcnt.fetch_add(1);
+ return true;
+ }
+
+ bool release_reference() {
+ assert(m_refcnt.load() > 0);
+ m_refcnt.fetch_add(-1);
+ return true;
+ }
+
+ size_t get_reference_count() { return m_refcnt.load(); }
+
+ std::vector<typename QueryType::LocalQuery *>
+ get_local_queries(std::vector<std::pair<ShardID, ShardType *>> &shards,
+ typename QueryType::Parameters *parms) {
+
+ std::vector<typename QueryType::LocalQuery *> queries;
+
+ for (auto &level : m_levels) {
+ level->get_local_queries(shards, queries, parms);
}
- /*
- * Find the first level below the level indicated by idx that
- * is capable of sustaining a reconstruction and return its
- * level index. If no such level exists, returns -1. Also
- * returns -1 if idx==0, and no such level exists, to simplify
- * the logic of the first buffer flush.
- */
- inline level_index find_reconstruction_target(level_index idx, state_vector &state) {
+ return queries;
+ }
- /*
- * this handles the very first buffer flush, when the state vector
- * is empty.
- */
- if (idx == 0 && state.size() == 0) return -1;
+private:
+ size_t m_scale_factor;
+ double m_max_delete_prop;
+ size_t m_buffer_size;
+
+ std::atomic<size_t> m_refcnt;
+
+ std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> m_levels;
+
+ /*
+ * A pair of <record_count, shard_count> for each level in the
+ * structure. Record counts may be slightly inaccurate due to
+ * deletes.
+ */
+ state_vector m_current_state;
+
+ /*
+ * Add a new level to the scratch state and return its index.
+ *
+ * IMPORTANT: This does _not_ add a level to the extension structure
+ * anymore. This is handled by the appropriate reconstruction and flush
+ * methods as needed. This function is for use in "simulated"
+ * reconstructions.
+ */
+ inline level_index grow(state_vector &scratch_state) {
+ level_index new_idx = m_levels.size();
+ size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+
+ scratch_state.push_back(
+ {0, calc_level_record_capacity(new_idx), 0, new_shard_cap});
+ return new_idx;
+ }
+
+ /*
+ * Find the first level below the level indicated by idx that
+ * is capable of sustaining a reconstruction and return its
+ * level index. If no such level exists, returns -1. Also
+ * returns -1 if idx==0, and no such level exists, to simplify
+ * the logic of the first buffer flush.
+ */
+ inline level_index find_reconstruction_target(level_index idx,
+ state_vector &state) {
- size_t incoming_rec_cnt = state[idx].reccnt;
- for (level_index i=idx+1; i<state.size(); i++) {
- if (can_reconstruct_with(i, incoming_rec_cnt, state)) {
- return i;
- }
+ /*
+ * this handles the very first buffer flush, when the state vector
+ * is empty.
+ */
+ if (idx == 0 && state.size() == 0)
+ return -1;
- incoming_rec_cnt = state[idx].reccnt;
- }
+ size_t incoming_rec_cnt = state[idx].reccnt;
+ for (level_index i = idx + 1; i < state.size(); i++) {
+ if (can_reconstruct_with(i, incoming_rec_cnt, state)) {
+ return i;
+ }
- return -1;
+ incoming_rec_cnt = state[idx].reccnt;
}
- inline void flush_buffer_into_l0(BuffView buffer) {
- size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
-
- if (m_levels.size() == 0) {
- m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(0, shard_capacity)));
+ return -1;
+ }
- m_current_state.push_back({0, calc_level_record_capacity(0),
- 0, shard_capacity});
- }
+ inline void flush_buffer_into_l0(BuffView buffer) {
+ size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
- if constexpr (L == LayoutPolicy::LEVELING) {
- // FIXME: Kludgey implementation due to interface constraints.
- auto old_level = m_levels[0].get();
- auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
- temp_level->append_buffer(std::move(buffer));
-
- if (old_level->get_shard_count() > 0) {
- m_levels[0] = InternalLevel<R, Shard, Q>::reconstruction(old_level, temp_level);
- delete temp_level;
- } else {
- m_levels[0] = std::shared_ptr<InternalLevel<R, Shard, Q>>(temp_level);
- }
- } else {
- m_levels[0]->append_buffer(std::move(buffer));
- }
+ if (m_levels.size() == 0) {
+ m_levels.emplace_back(
+ std::shared_ptr<InternalLevel<ShardType, QueryType>>(
+ new InternalLevel<ShardType, QueryType>(0, shard_capacity)));
- /* update the state vector */
- m_current_state[0].reccnt = m_levels[0]->get_record_count();
- m_current_state[0].shardcnt = m_levels[0]->get_shard_count();
+ m_current_state.push_back(
+ {0, calc_level_record_capacity(0), 0, shard_capacity});
}
- /*
- * Mark a given memory level as no-longer in use by the tree. For now this
- * will just free the level. In future, this will be more complex as the
- * level may not be able to immediately be deleted, depending upon who
- * else is using it.
- */
- inline void mark_as_unused(std::shared_ptr<InternalLevel<R, Shard, Q>> level) {
- level.reset();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ // FIXME: Kludgey implementation due to interface constraints.
+ auto old_level = m_levels[0].get();
+ auto temp_level = new InternalLevel<ShardType, QueryType>(0, 1);
+ temp_level->append_buffer(std::move(buffer));
+
+ if (old_level->get_shard_count() > 0) {
+ m_levels[0] = InternalLevel<ShardType, QueryType>::reconstruction(
+ old_level, temp_level);
+ delete temp_level;
+ } else {
+ m_levels[0] =
+ std::shared_ptr<InternalLevel<ShardType, QueryType>>(temp_level);
+ }
+ } else {
+ m_levels[0]->append_buffer(std::move(buffer));
}
- /*
- * Assume that level "0" should be larger than the buffer. The buffer
- * itself is index -1, which should return simply the buffer capacity.
- */
- inline size_t calc_level_record_capacity(level_index idx) {
- return m_buffer_size * pow(m_scale_factor, idx+1);
+ /* update the state vector */
+ m_current_state[0].reccnt = m_levels[0]->get_record_count();
+ m_current_state[0].shardcnt = m_levels[0]->get_shard_count();
+ }
+
+ /*
+ * Mark a given memory level as no-longer in use by the tree. For now this
+ * will just free the level. In future, this will be more complex as the
+ * level may not be able to immediately be deleted, depending upon who
+ * else is using it.
+ */
+ inline void
+ mark_as_unused(std::shared_ptr<InternalLevel<ShardType, QueryType>> level) {
+ level.reset();
+ }
+
+ /*
+ * Assume that level "0" should be larger than the buffer. The buffer
+ * itself is index -1, which should return simply the buffer capacity.
+ */
+ inline size_t calc_level_record_capacity(level_index idx) {
+ return m_buffer_size * pow(m_scale_factor, idx + 1);
+ }
+
+ /*
+ * Returns the number of records present on a specified level.
+ */
+ inline size_t get_level_record_count(level_index idx) {
+ return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0;
+ }
+
+ /*
+ * Determines if a level can sustain a reconstruction with incoming_rec_cnt
+ * additional records without exceeding its capacity.
+ */
+ inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt,
+ state_vector &state) {
+ if (idx >= state.size()) {
+ return false;
}
- /*
- * Returns the number of records present on a specified level.
- */
- inline size_t get_level_record_count(level_index idx) {
- return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0;
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap;
+ } else if constexpr (L == LayoutPolicy::BSM) {
+ return state[idx].reccnt == 0;
+ } else {
+ return state[idx].shardcnt < state[idx].shardcap;
}
- /*
- * Determines if a level can sustain a reconstruction with incoming_rec_cnt
- * additional records without exceeding its capacity.
- */
- inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) {
- if (idx >= state.size()) {
- return false;
- }
-
- if constexpr (L == LayoutPolicy::LEVELING) {
- return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap;
- } else if constexpr (L == LayoutPolicy::BSM) {
- return state[idx].reccnt == 0;
- } else {
- return state[idx].shardcnt < state[idx].shardcap;
- }
-
- /* unreachable */
- assert(true);
- }
+ /* unreachable */
+ assert(true);
+ }
};
-}
-
+} // namespace de
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index b962dcc..a4cf94d 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -1,8 +1,8 @@
/*
* include/framework/structure/InternalLevel.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -15,276 +15,281 @@
*/
#pragma once
-#include <vector>
#include <memory>
+#include <vector>
-#include "util/types.h"
-#include "framework/interface/Shard.h"
#include "framework/interface/Query.h"
#include "framework/interface/Record.h"
+#include "framework/interface/Shard.h"
#include "framework/structure/BufferView.h"
+#include "util/types.h"
namespace de {
-template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q>
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
class InternalLevel;
-
-
-template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q>
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
class InternalLevel {
- typedef S Shard;
- typedef BufferView<R> BuffView;
-public:
- InternalLevel(ssize_t level_no, size_t shard_cap)
- : m_level_no(level_no)
- , m_shard_cnt(0)
- , m_shards(shard_cap, nullptr)
- , m_pending_shard(nullptr)
- {}
-
- ~InternalLevel() {
- delete m_pending_shard;
- }
+ typedef typename ShardType::RECORD RecordType;
+ typedef BufferView<RecordType> BuffView;
- /*
- * Create a new shard combining the records from base_level and new_level,
- * and return a shared_ptr to a new level containing this shard. This is used
- * for reconstructions under the leveling layout policy.
- *
- * No changes are made to the levels provided as arguments.
- */
- static std::shared_ptr<InternalLevel> reconstruction(InternalLevel* base_level, InternalLevel* new_level) {
- assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
- auto res = new InternalLevel(base_level->m_level_no, 1);
- res->m_shard_cnt = 1;
- std::vector<Shard *> shards = {base_level->m_shards[0].get(),
+public:
+ InternalLevel(ssize_t level_no, size_t shard_cap)
+ : m_level_no(level_no), m_shard_cnt(0), m_shards(shard_cap, nullptr),
+ m_pending_shard(nullptr) {}
+
+ ~InternalLevel() { delete m_pending_shard; }
+
+ /*
+ * Create a new shard combining the records from base_level and new_level,
+ * and return a shared_ptr to a new level containing this shard. This is used
+ * for reconstructions under the leveling layout policy.
+ *
+ * No changes are made to the levels provided as arguments.
+ */
+ static std::shared_ptr<InternalLevel>
+ reconstruction(InternalLevel *base_level, InternalLevel *new_level) {
+ assert(base_level->m_level_no > new_level->m_level_no ||
+ (base_level->m_level_no == 0 && new_level->m_level_no == 0));
+ auto res = new InternalLevel(base_level->m_level_no, 1);
+ res->m_shard_cnt = 1;
+ std::vector<ShardType *> shards = {base_level->m_shards[0].get(),
new_level->m_shards[0].get()};
- res->m_shards[0] = std::make_shared<S>(shards);
- return std::shared_ptr<InternalLevel>(res);
+ res->m_shards[0] = std::make_shared<ShardType>(shards);
+ return std::shared_ptr<InternalLevel>(res);
+ }
+
+ static std::shared_ptr<InternalLevel>
+ reconstruction(std::vector<InternalLevel *> levels, size_t level_idx) {
+ std::vector<ShardType *> shards;
+ for (auto level : levels) {
+ for (auto shard : level->m_shards) {
+ if (shard)
+ shards.emplace_back(shard.get());
+ }
}
- static std::shared_ptr<InternalLevel> reconstruction(std::vector<InternalLevel*> levels, size_t level_idx) {
- std::vector<Shard *> shards;
- for (auto level : levels) {
- for (auto shard : level->m_shards) {
- if (shard) shards.emplace_back(shard.get());
- }
- }
-
- auto res = new InternalLevel(level_idx, 1);
- res->m_shard_cnt = 1;
- res->m_shards[0] = std::make_shared<S>(shards);
+ auto res = new InternalLevel(level_idx, 1);
+ res->m_shard_cnt = 1;
+ res->m_shards[0] = std::make_shared<ShardType>(shards);
+
+ return std::shared_ptr<InternalLevel>(res);
+ }
+
+ /*
+ * Create a new shard combining the records from all of
+ * the shards in level, and append this new shard into
+ * this level. This is used for reconstructions under
+ * the tiering layout policy.
+ *
+ * No changes are made to the level provided as an argument.
+ */
+ void append_level(InternalLevel *level) {
+ // FIXME: that this is happening probably means that
+ // something is going terribly wrong earlier in the
+ // reconstruction logic.
+ if (level->get_shard_count() == 0) {
+ return;
+ }
- return std::shared_ptr<InternalLevel>(res);
+ std::vector<ShardType *> shards;
+ for (auto shard : level->m_shards) {
+ if (shard)
+ shards.emplace_back(shard.get());
}
- /*
- * Create a new shard combining the records from all of
- * the shards in level, and append this new shard into
- * this level. This is used for reconstructions under
- * the tiering layout policy.
- *
- * No changes are made to the level provided as an argument.
- */
- void append_level(InternalLevel* level) {
- // FIXME: that this is happening probably means that
- // something is going terribly wrong earlier in the
- // reconstruction logic.
- if (level->get_shard_count() == 0) {
- return;
- }
+ if (m_shard_cnt == m_shards.size()) {
+ m_pending_shard = new ShardType(shards);
+ return;
+ }
- std::vector<S*> shards;
- for (auto shard : level->m_shards) {
- if (shard) shards.emplace_back(shard.get());
- }
+ auto tmp = new ShardType(shards);
+ m_shards[m_shard_cnt] = std::shared_ptr<ShardType>(tmp);
+
+ ++m_shard_cnt;
+ }
+
+ /*
+ * Create a new shard using the records in the
+ * provided buffer, and append this new shard
+ * into this level. This is used for buffer
+ * flushes under the tiering layout policy.
+ */
+ void append_buffer(BuffView buffer) {
+ if (m_shard_cnt == m_shards.size()) {
+ assert(m_pending_shard == nullptr);
+ m_pending_shard = new ShardType(std::move(buffer));
+ return;
+ }
- if (m_shard_cnt == m_shards.size()) {
- m_pending_shard = new S(shards);
- return;
- }
+ m_shards[m_shard_cnt] = std::make_shared<ShardType>(std::move(buffer));
+ ++m_shard_cnt;
+ }
- auto tmp = new S(shards);
- m_shards[m_shard_cnt] = std::shared_ptr<S>(tmp);
+ void finalize() {
+ if (m_pending_shard) {
+ for (size_t i = 0; i < m_shards.size(); i++) {
+ m_shards[i] = nullptr;
+ }
- ++m_shard_cnt;
+ m_shards[0] = std::shared_ptr<ShardType>(m_pending_shard);
+ m_pending_shard = nullptr;
+ m_shard_cnt = 1;
}
-
- /*
- * Create a new shard using the records in the
- * provided buffer, and append this new shard
- * into this level. This is used for buffer
- * flushes under the tiering layout policy.
- */
- void append_buffer(BuffView buffer) {
- if (m_shard_cnt == m_shards.size()) {
- assert(m_pending_shard == nullptr);
- m_pending_shard = new S(std::move(buffer));
- return;
- }
-
- m_shards[m_shard_cnt] = std::make_shared<S>(std::move(buffer));
- ++m_shard_cnt;
+ }
+
+ /*
+ * Create a new shard containing the combined records
+ * from all shards on this level and return it.
+ *
+ * No changes are made to this level.
+ */
+ ShardType *get_combined_shard() {
+ if (m_shard_cnt == 0) {
+ return nullptr;
}
- void finalize() {
- if (m_pending_shard) {
- for (size_t i=0; i<m_shards.size(); i++) {
- m_shards[i] = nullptr;
- }
-
- m_shards[0] = std::shared_ptr<S>(m_pending_shard);
- m_pending_shard = nullptr;
- m_shard_cnt = 1;
- }
+ std::vector<ShardType *> shards;
+ for (auto shard : m_shards) {
+ if (shard)
+ shards.emplace_back(shard.get());
}
- /*
- * Create a new shard containing the combined records
- * from all shards on this level and return it.
- *
- * No changes are made to this level.
- */
- Shard *get_combined_shard() {
- if (m_shard_cnt == 0) {
- return nullptr;
- }
-
- std::vector<Shard *> shards;
- for (auto shard : m_shards) {
- if (shard) shards.emplace_back(shard.get());
- }
-
- return new S(shards);
+ return new ShardType(shards);
+ }
+
+ void get_local_queries(
+ std::vector<std::pair<ShardID, ShardType *>> &shards,
+ std::vector<typename QueryType::LocalQuery *> &local_queries,
+ typename QueryType::Parameters *query_parms) {
+ for (size_t i = 0; i < m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ auto local_query =
+ QueryType::local_preproc(m_shards[i].get(), query_parms);
+ shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].get()});
+ local_queries.emplace_back(local_query);
+ }
}
+ }
- void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
- for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_shards[i]) {
- auto shard_state = Q::get_query_state(m_shards[i].get(), query_parms);
- shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i].get()});
- shard_states.emplace_back(shard_state);
- }
+ bool check_tombstone(size_t shard_stop, const RecordType &rec) {
+ if (m_shard_cnt == 0)
+ return false;
+
+ for (int i = m_shard_cnt - 1; i >= (ssize_t)shard_stop; i--) {
+ if (m_shards[i]) {
+ auto res = m_shards[i]->point_lookup(rec, true);
+ if (res && res->is_tombstone()) {
+ return true;
}
+ }
}
-
- bool check_tombstone(size_t shard_stop, const R& rec) {
- if (m_shard_cnt == 0) return false;
-
- for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) {
- if (m_shards[i]) {
- auto res = m_shards[i]->point_lookup(rec, true);
- if (res && res->is_tombstone()) {
- return true;
- }
- }
+ return false;
+ }
+
+ bool delete_record(const RecordType &rec) {
+ if (m_shard_cnt == 0)
+ return false;
+
+ for (size_t i = 0; i < m_shards.size(); ++i) {
+ if (m_shards[i]) {
+ auto res = m_shards[i]->point_lookup(rec);
+ if (res) {
+ res->set_delete();
+ return true;
}
- return false;
+ }
}
- bool delete_record(const R &rec) {
- if (m_shard_cnt == 0) return false;
-
- for (size_t i = 0; i < m_shards.size(); ++i) {
- if (m_shards[i]) {
- auto res = m_shards[i]->point_lookup(rec);
- if (res) {
- res->set_delete();
- return true;
- }
- }
- }
+ return false;
+ }
- return false;
+ ShardType *get_shard(size_t idx) {
+ if (idx >= m_shard_cnt) {
+ return nullptr;
}
- Shard* get_shard(size_t idx) {
- if (idx >= m_shard_cnt) {
- return nullptr;
- }
+ return m_shards[idx].get();
+ }
- return m_shards[idx].get();
- }
+ size_t get_shard_count() { return m_shard_cnt; }
- size_t get_shard_count() {
- return m_shard_cnt;
+ size_t get_record_count() {
+ size_t cnt = 0;
+ for (size_t i = 0; i < m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ cnt += m_shards[i]->get_record_count();
+ }
}
- size_t get_record_count() {
- size_t cnt = 0;
- for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_shards[i]) {
- cnt += m_shards[i]->get_record_count();
- }
- }
+ return cnt;
+ }
- return cnt;
+ size_t get_tombstone_count() {
+ size_t res = 0;
+ for (size_t i = 0; i < m_shard_cnt; ++i) {
+ if (m_shards[i]) {
+ res += m_shards[i]->get_tombstone_count();
+ }
}
-
- size_t get_tombstone_count() {
- size_t res = 0;
- for (size_t i = 0; i < m_shard_cnt; ++i) {
- if (m_shards[i]) {
- res += m_shards[i]->get_tombstone_count();
- }
- }
- return res;
+ return res;
+ }
+
+ size_t get_aux_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i = 0; i < m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ cnt += m_shards[i]->get_aux_memory_usage();
+ }
}
- size_t get_aux_memory_usage() {
- size_t cnt = 0;
- for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_shards[i]){
- cnt += m_shards[i]->get_aux_memory_usage();
- }
- }
+ return cnt;
+ }
- return cnt;
+ size_t get_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i = 0; i < m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ cnt += m_shards[i]->get_memory_usage();
+ }
}
- size_t get_memory_usage() {
- size_t cnt = 0;
- for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_shards[i]) {
- cnt += m_shards[i]->get_memory_usage();
- }
- }
-
- return cnt;
+ return cnt;
+ }
+
+ double get_tombstone_prop() {
+ size_t tscnt = 0;
+ size_t reccnt = 0;
+ for (size_t i = 0; i < m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ tscnt += m_shards[i]->get_tombstone_count();
+ reccnt += m_shards[i]->get_record_count();
+ }
}
- double get_tombstone_prop() {
- size_t tscnt = 0;
- size_t reccnt = 0;
- for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_shards[i]) {
- tscnt += m_shards[i]->get_tombstone_count();
- reccnt += m_shards[i]->get_record_count();
- }
- }
+ return (double)tscnt / (double)(tscnt + reccnt);
+ }
- return (double) tscnt / (double) (tscnt + reccnt);
+ std::shared_ptr<InternalLevel> clone() {
+ auto new_level =
+ std::make_shared<InternalLevel>(m_level_no, m_shards.size());
+ for (size_t i = 0; i < m_shard_cnt; i++) {
+ new_level->m_shards[i] = m_shards[i];
}
+ new_level->m_shard_cnt = m_shard_cnt;
- std::shared_ptr<InternalLevel> clone() {
- auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size());
- for (size_t i=0; i<m_shard_cnt; i++) {
- new_level->m_shards[i] = m_shards[i];
- }
- new_level->m_shard_cnt = m_shard_cnt;
-
- return new_level;
- }
+ return new_level;
+ }
private:
- ssize_t m_level_no;
-
- size_t m_shard_cnt;
- size_t m_shard_size_cap;
+ ssize_t m_level_no;
+
+ size_t m_shard_cnt;
+ size_t m_shard_size_cap;
- std::vector<std::shared_ptr<Shard>> m_shards;
- Shard *m_pending_shard;
+ std::vector<std::shared_ptr<ShardType>> m_shards;
+ ShardType *m_pending_shard;
};
-}
+} // namespace de
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 7db3980..625b04b 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -1,8 +1,8 @@
/*
* include/framework/structure/MutableBuffer.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -18,301 +18,281 @@
*/
#pragma once
-#include <cstdlib>
#include <atomic>
#include <cassert>
+#include <cstdlib>
#include <immintrin.h>
-#include "psu-util/alignment.h"
-#include "util/bf_config.h"
-#include "psu-ds/BloomFilter.h"
#include "framework/interface/Record.h"
#include "framework/structure/BufferView.h"
-
-using psudb::CACHELINE_SIZE;
+#include "psu-ds/BloomFilter.h"
+#include "psu-util/alignment.h"
+#include "util/bf_config.h"
namespace de {
-template <RecordInterface R>
-class MutableBuffer {
- friend class BufferView<R>;
+template <RecordInterface R> class MutableBuffer {
+ friend class BufferView<R>;
- struct buffer_head {
- size_t head_idx;
- size_t refcnt;
- };
-
-public:
- MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0)
- : m_lwm(low_watermark)
- , m_hwm(high_watermark)
- , m_cap((capacity == 0) ? 2 * high_watermark : capacity)
- , m_tail(0)
- , m_head({0, 0})
- , m_old_head({high_watermark, 0})
- //, m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)))
- , m_data(new Wrapped<R>[m_cap]())
- , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS))
- , m_tscnt(0)
- , m_old_tscnt(0)
- , m_active_head_advance(false)
- {
- assert(m_cap > m_hwm);
- assert(m_hwm >= m_lwm);
- }
+ struct buffer_head {
+ size_t head_idx;
+ size_t refcnt;
+ };
- ~MutableBuffer() {
- delete[] m_data;
- delete m_tombstone_filter;
+public:
+ MutableBuffer(size_t low_watermark, size_t high_watermark,
+ size_t capacity = 0)
+ : m_lwm(low_watermark), m_hwm(high_watermark),
+ m_cap((capacity == 0) ? 2 * high_watermark : capacity), m_tail(0),
+ m_head({0, 0}), m_old_head({high_watermark, 0}),
+ m_data(new Wrapped<R>[m_cap]()),
+ m_tombstone_filter(
+ new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS)),
+ m_tscnt(0), m_old_tscnt(0), m_active_head_advance(false) {
+ assert(m_cap > m_hwm);
+ assert(m_hwm >= m_lwm);
+ }
+
+ ~MutableBuffer() {
+ delete[] m_data;
+ delete m_tombstone_filter;
+ }
+
+ int append(const R &rec, bool tombstone = false) {
+ int32_t tail = 0;
+ if ((tail = try_advance_tail()) == -1) {
+ return 0;
}
- int append(const R &rec, bool tombstone=false) {
- int32_t tail = 0;
- if ((tail = try_advance_tail()) == -1) {
- return 0;
- }
-
- Wrapped<R> wrec;
- wrec.rec = rec;
- wrec.header = 0;
- if (tombstone) wrec.set_tombstone();
+ Wrapped<R> wrec;
+ wrec.rec = rec;
+ wrec.header = 0;
+ if (tombstone)
+ wrec.set_tombstone();
- // FIXME: because of the mod, it isn't correct to use `pos`
- // as the ordering timestamp in the header anymore.
- size_t pos = tail % m_cap;
-
- m_data[pos] = wrec;
- m_data[pos].set_timestamp(pos);
-
- if (tombstone) {
- m_tscnt.fetch_add(1);
- if (m_tombstone_filter) m_tombstone_filter->insert(rec);
- }
+ // FIXME: because of the mod, it isn't correct to use `pos`
+ // as the ordering timestamp in the header anymore.
+ size_t pos = tail % m_cap;
- m_data[pos].set_visible();
+ m_data[pos] = wrec;
+ m_data[pos].set_timestamp(pos);
- return 1;
+ if (tombstone) {
+ m_tscnt.fetch_add(1);
+ if (m_tombstone_filter)
+ m_tombstone_filter->insert(rec);
}
- bool truncate() {
- m_tscnt.store(0);
- m_tail.store(0);
- if (m_tombstone_filter) m_tombstone_filter->clear();
+ m_data[pos].set_visible();
- return true;
- }
+ return 1;
+ }
- size_t get_record_count() {
- return m_tail.load() - m_head.load().head_idx;
- }
-
- size_t get_capacity() {
- return m_cap;
- }
+ bool truncate() {
+ m_tscnt.store(0);
+ m_tail.store(0);
+ if (m_tombstone_filter)
+ m_tombstone_filter->clear();
- bool is_full() {
- return get_record_count() >= m_hwm;
- }
+ return true;
+ }
- bool is_at_low_watermark() {
- return get_record_count() >= m_lwm;
- }
+ size_t get_record_count() { return m_tail.load() - m_head.load().head_idx; }
- size_t get_tombstone_count() {
- return m_tscnt.load();
- }
+ size_t get_capacity() { return m_cap; }
- bool delete_record(const R& rec) {
- return get_buffer_view().delete_record(rec);
- }
+ bool is_full() { return get_record_count() >= m_hwm; }
- bool check_tombstone(const R& rec) {
- return get_buffer_view().check_tombstone(rec);
- }
+ bool is_at_low_watermark() { return get_record_count() >= m_lwm; }
- size_t get_memory_usage() {
- return m_cap * sizeof(Wrapped<R>);
- }
+ size_t get_tombstone_count() { return m_tscnt.load(); }
- size_t get_aux_memory_usage() {
- return m_tombstone_filter->get_memory_usage();
- }
+ bool delete_record(const R &rec) {
+ return get_buffer_view().delete_record(rec);
+ }
- BufferView<R> get_buffer_view(size_t target_head) {
- size_t head = get_head(target_head);
- auto f = std::bind(release_head_reference, (void *) this, head);
+ bool check_tombstone(const R &rec) {
+ return get_buffer_view().check_tombstone(rec);
+ }
- return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
- }
+ size_t get_memory_usage() { return m_cap * sizeof(Wrapped<R>); }
- BufferView<R> get_buffer_view() {
- size_t head = get_head(m_head.load().head_idx);
- auto f = std::bind(release_head_reference, (void *) this, head);
+ size_t get_aux_memory_usage() {
+ return m_tombstone_filter->get_memory_usage();
+ }
- return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
- }
+ BufferView<R> get_buffer_view(size_t target_head) {
+ size_t head = get_head(target_head);
+ auto f = std::bind(release_head_reference, (void *)this, head);
- /*
- * Advance the buffer following a reconstruction. Move current
- * head and head_refcnt into old_head and old_head_refcnt, then
- * assign new_head to old_head.
- */
- bool advance_head(size_t new_head) {
- assert(new_head > m_head.load().head_idx);
- assert(new_head <= m_tail.load());
-
- /* refuse to advance head while there is an old with one references */
- if (m_old_head.load().refcnt > 0) {
- //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n");
- return false;
- }
-
- m_active_head_advance.store(true);
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(),
+ m_tombstone_filter, f);
+ }
- buffer_head new_hd = {new_head, 0};
- buffer_head cur_hd;
+ BufferView<R> get_buffer_view() {
+ size_t head = get_head(m_head.load().head_idx);
+ auto f = std::bind(release_head_reference, (void *)this, head);
- /* replace current head with new head */
- do {
- cur_hd = m_head.load();
- } while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(),
+ m_tombstone_filter, f);
+ }
- /* move the current head into the old head */
- m_old_head.store(cur_hd);
+ /*
+ * Advance the buffer following a reconstruction. Move current
+ * head and head_refcnt into old_head and old_head_refcnt, then
+ * assign new_head to old_head.
+ */
+ bool advance_head(size_t new_head) {
+ assert(new_head > m_head.load().head_idx);
+ assert(new_head <= m_tail.load());
- m_active_head_advance.store(false);
- return true;
+ /* refuse to advance head while there is an old with one references */
+ if (m_old_head.load().refcnt > 0) {
+ // fprintf(stderr, "[W]: Refusing to advance head due to remaining
+ // reference counts\n");
+ return false;
}
- /*
- * FIXME: If target_head does not match *either* the old_head or the
- * current_head, this routine will loop infinitely.
- */
- size_t get_head(size_t target_head) {
- buffer_head cur_hd, new_hd;
- bool head_acquired = false;
-
- do {
- if (m_old_head.load().head_idx == target_head) {
- cur_hd = m_old_head.load();
- cur_hd.head_idx = target_head;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
- head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
- } else if (m_head.load().head_idx == target_head){
- cur_hd = m_head.load();
- cur_hd.head_idx = target_head;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
- head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
- }
- } while(!head_acquired);
-
- return new_hd.head_idx;
+ m_active_head_advance.store(true);
+
+ buffer_head new_hd = {new_head, 0};
+ buffer_head cur_hd;
+
+ /* replace current head with new head */
+ do {
+ cur_hd = m_head.load();
+ } while (!m_head.compare_exchange_strong(cur_hd, new_hd));
+
+ /* move the current head into the old head */
+ m_old_head.store(cur_hd);
+
+ m_active_head_advance.store(false);
+ return true;
+ }
+
+ /*
+ * FIXME: If target_head does not match *either* the old_head or the
+ * current_head, this routine will loop infinitely.
+ */
+ size_t get_head(size_t target_head) {
+ buffer_head cur_hd, new_hd;
+ bool head_acquired = false;
+
+ do {
+ if (m_old_head.load().head_idx == target_head) {
+ cur_hd = m_old_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else if (m_head.load().head_idx == target_head) {
+ cur_hd = m_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ } while (!head_acquired);
+
+ return new_hd.head_idx;
+ }
+
+ void set_low_watermark(size_t lwm) {
+ assert(lwm < m_hwm);
+ m_lwm = lwm;
+ }
+
+ size_t get_low_watermark() { return m_lwm; }
+
+ void set_high_watermark(size_t hwm) {
+ assert(hwm > m_lwm);
+ assert(hwm < m_cap);
+ m_hwm = hwm;
+ }
+
+ size_t get_high_watermark() { return m_hwm; }
+
+ size_t get_tail() { return m_tail.load(); }
+
+ /*
+ * Note: this returns the available physical storage capacity,
+ * *not* now many more records can be inserted before the
+ * HWM is reached. It considers the old_head to be "free"
+ * when it has no remaining references. This should be true,
+ * but a buggy framework implementation may violate the
+ * assumption.
+ */
+ size_t get_available_capacity() {
+ if (m_old_head.load().refcnt == 0) {
+ return m_cap - (m_tail.load() - m_head.load().head_idx);
}
- void set_low_watermark(size_t lwm) {
- assert(lwm < m_hwm);
- m_lwm = lwm;
- }
+ return m_cap - (m_tail.load() - m_old_head.load().head_idx);
+ }
- size_t get_low_watermark() {
- return m_lwm;
- }
+private:
+ int64_t try_advance_tail() {
+ size_t old_value = m_tail.load();
- void set_high_watermark(size_t hwm) {
- assert(hwm > m_lwm);
- assert(hwm < m_cap);
- m_hwm = hwm;
+ /* if full, fail to advance the tail */
+ if (old_value - m_head.load().head_idx >= m_hwm) {
+ return -1;
}
- size_t get_high_watermark() {
- return m_hwm;
- }
+ while (!m_tail.compare_exchange_strong(old_value, old_value + 1)) {
+ /* if full, stop trying and fail to advance the tail */
+ if (m_tail.load() >= m_hwm) {
+ return -1;
+ }
- size_t get_tail() {
- return m_tail.load();
+ _mm_pause();
}
- /*
- * Note: this returns the available physical storage capacity,
- * *not* now many more records can be inserted before the
- * HWM is reached. It considers the old_head to be "free"
- * when it has no remaining references. This should be true,
- * but a buggy framework implementation may violate the
- * assumption.
- */
- size_t get_available_capacity() {
- if (m_old_head.load().refcnt == 0) {
- return m_cap - (m_tail.load() - m_head.load().head_idx);
- }
+ return old_value;
+ }
- return m_cap - (m_tail.load() - m_old_head.load().head_idx);
- }
+ size_t to_idx(size_t i, size_t head) { return (head + i) % m_cap; }
-private:
- int64_t try_advance_tail() {
- size_t old_value = m_tail.load();
+ static void release_head_reference(void *buff, size_t head) {
+ MutableBuffer<R> *buffer = (MutableBuffer<R> *)buff;
- /* if full, fail to advance the tail */
- if (old_value - m_head.load().head_idx >= m_hwm) {
- return -1;
+ buffer_head cur_hd, new_hd;
+ do {
+ if (buffer->m_old_head.load().head_idx == head) {
+ cur_hd = buffer->m_old_head;
+ if (cur_hd.refcnt == 0)
+ continue;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1};
+ if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
+ break;
}
-
- while (!m_tail.compare_exchange_strong(old_value, old_value+1)) {
- /* if full, stop trying and fail to advance the tail */
- if (m_tail.load() >= m_hwm) {
- return -1;
- }
-
- _mm_pause();
+ } else {
+ cur_hd = buffer->m_head;
+ if (cur_hd.refcnt == 0)
+ continue;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1};
+
+ if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
+ break;
}
+ }
+ _mm_pause();
+ } while (true);
+ }
- return old_value;
- }
+ size_t m_lwm;
+ size_t m_hwm;
+ size_t m_cap;
- size_t to_idx(size_t i, size_t head) {
- return (head + i) % m_cap;
- }
+ alignas(64) std::atomic<size_t> m_tail;
- static void release_head_reference(void *buff, size_t head) {
- MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff;
-
- buffer_head cur_hd, new_hd;
- do {
- if (buffer->m_old_head.load().head_idx == head) {
- cur_hd = buffer->m_old_head;
- if (cur_hd.refcnt == 0) continue;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
- if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
- break;
- }
- } else {
- cur_hd = buffer->m_head;
- if (cur_hd.refcnt == 0) continue;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
-
- if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
- break;
- }
- }
- _mm_pause();
- } while(true);
- }
+ alignas(64) std::atomic<buffer_head> m_head;
+ alignas(64) std::atomic<buffer_head> m_old_head;
+
+ Wrapped<R> *m_data;
+ psudb::BloomFilter<R> *m_tombstone_filter;
+ alignas(64) std::atomic<size_t> m_tscnt;
+ size_t m_old_tscnt;
- size_t m_lwm;
- size_t m_hwm;
- size_t m_cap;
-
- alignas(64) std::atomic<size_t> m_tail;
-
- alignas(64) std::atomic<buffer_head> m_head;
- alignas(64) std::atomic<buffer_head> m_old_head;
-
- Wrapped<R>* m_data;
- psudb::BloomFilter<R>* m_tombstone_filter;
- alignas(64) std::atomic<size_t> m_tscnt;
- size_t m_old_tscnt;
-
- alignas(64) std::atomic<bool> m_active_head_advance;
+ alignas(64) std::atomic<bool> m_active_head_advance;
};
-}
+} // namespace de