summaryrefslogtreecommitdiffstats
path: root/include/framework/structure
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/structure')
-rw-r--r--include/framework/structure/BufferView.h170
-rw-r--r--include/framework/structure/ExtensionStructure.h495
-rw-r--r--include/framework/structure/InternalLevel.h271
-rw-r--r--include/framework/structure/MutableBuffer.h313
4 files changed, 1249 insertions, 0 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
new file mode 100644
index 0000000..9e0872b
--- /dev/null
+++ b/include/framework/structure/BufferView.h
@@ -0,0 +1,170 @@
+/*
+ * include/framework/structure/BufferView.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * TODO: This file is very poorly commented.
+ */
+#pragma once
+
+#include <cstdlib>
+#include <cassert>
+#include <functional>
+#include <utility>
+
+#include "psu-util/alignment.h"
+#include "psu-ds/BloomFilter.h"
+#include "framework/interface/Record.h"
+
+namespace de {
+
+typedef std::_Bind<void (*(void*, long unsigned int))(void*, long unsigned int)> ReleaseFunction;
+
+template <RecordInterface R>
+class BufferView {
+public:
+ BufferView() = default;
+
+ /*
+ * the BufferView's lifetime is tightly linked to buffer versioning, and so
+ * copying and assignment are disabled.
+ */
+ BufferView(const BufferView&) = delete;
+ BufferView &operator=(BufferView &) = delete;
+
+ BufferView(BufferView &&other)
+ : m_data(std::exchange(other.m_data, nullptr))
+ , m_release(std::move(other.m_release))
+ , m_head(std::exchange(other.m_head, 0))
+ , m_tail(std::exchange(other.m_tail, 0))
+ , m_start(std::exchange(other.m_start, 0))
+ , m_stop(std::exchange(other.m_stop, 0))
+ , m_cap(std::exchange(other.m_cap, 0))
+ , m_approx_ts_cnt(std::exchange(other.m_approx_ts_cnt, 0))
+ , m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr))
+ , m_active(std::exchange(other.m_active, false)) {}
+
+ BufferView &operator=(BufferView &&other) = delete;
+
+
+ BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail, size_t tombstone_cnt, psudb::BloomFilter<R> *filter,
+ ReleaseFunction release)
+ : m_data(buffer)
+ , m_release(release)
+ , m_head(head)
+ , m_tail(tail)
+ , m_start(m_head % cap)
+ , m_stop(m_tail % cap)
+ , m_cap(cap)
+ , m_approx_ts_cnt(tombstone_cnt)
+ , m_tombstone_filter(filter)
+ , m_active(true) {}
+
+ ~BufferView() {
+ if (m_active) {
+ m_release();
+ }
+ }
+
+ bool check_tombstone(const R& rec) {
+ if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false;
+
+ for (size_t i=0; i<get_record_count(); i++) {
+ if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ bool delete_record(const R& rec) {
+ if (m_start < m_stop) {
+ for (size_t i=m_start; i<m_stop; i++) {
+ if (m_data[i].rec == rec) {
+ m_data[i].set_delete();
+ return true;
+ }
+ }
+ } else {
+ for (size_t i=m_start; i<m_cap; i++) {
+ if (m_data[i].rec == rec) {
+ m_data[i].set_delete();
+ return true;
+ }
+ }
+
+ for (size_t i=0; i<m_stop; i++) {
+ if (m_data[i].rec == rec) {
+ m_data[i].set_delete();
+ return true;
+ }
+
+ }
+
+ }
+
+ return false;
+ }
+
+ size_t get_record_count() {
+ return m_tail - m_head;
+ }
+
+ /*
+ * NOTE: This function returns an upper bound on the number
+ * of tombstones within the view. There may be less than
+ * this, due to synchronization issues during view creation.
+ */
+ size_t get_tombstone_count() {
+ return m_approx_ts_cnt;
+ }
+
+ Wrapped<R> *get(size_t i) {
+ assert(i < get_record_count());
+ return m_data + to_idx(i);
+ }
+
+ void copy_to_buffer(psudb::byte *buffer) {
+ /* check if the region to be copied circles back to start. If so, do it in two steps */
+ if (m_start > m_stop) {
+ size_t split_idx = m_cap - m_start;
+
+ memcpy(buffer, (std::byte*) (m_data + m_start), split_idx* sizeof(Wrapped<R>));
+ memcpy(buffer + (split_idx * sizeof(Wrapped<R>)), (std::byte*) m_data, m_stop * sizeof(Wrapped<R>));
+ } else {
+ memcpy(buffer, (std::byte*) (m_data + m_start), get_record_count() * sizeof(Wrapped<R>));
+ }
+ }
+
+ size_t get_tail() {
+ return m_tail;
+ }
+
+ size_t get_head() {
+ return m_head;
+ }
+
+private:
+ Wrapped<R>* m_data;
+ ReleaseFunction m_release;
+ size_t m_head;
+ size_t m_tail;
+ size_t m_start;
+ size_t m_stop;
+ size_t m_cap;
+ size_t m_approx_ts_cnt;
+ psudb::BloomFilter<R> *m_tombstone_filter;
+ bool m_active;
+
+ size_t to_idx(size_t i) {
+ size_t idx = (m_start + i >= m_cap) ? i = (m_cap - m_start)
+ : m_start + i;
+ assert(idx < m_cap);
+ return idx;
+ }
+};
+
+}
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
new file mode 100644
index 0000000..4802bc1
--- /dev/null
+++ b/include/framework/structure/ExtensionStructure.h
@@ -0,0 +1,495 @@
+/*
+ * include/framework/structure/ExtensionStructure.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <atomic>
+#include <cstdio>
+#include <vector>
+
+#include "framework/structure/BufferView.h"
+#include "framework/structure/InternalLevel.h"
+
+#include "framework/util/Configuration.h"
+
+#include "psu-util/timer.h"
+
+namespace de {
+
+template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L=LayoutPolicy::TEIRING>
+class ExtensionStructure {
+ typedef S Shard;
+ typedef BufferView<R> BuffView;
+
+public:
+ ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop)
+ : m_scale_factor(scale_factor)
+ , m_max_delete_prop(max_delete_prop)
+ , m_buffer_size(buffer_size)
+ {}
+
+ ~ExtensionStructure() = default;
+
+ /*
+ * Create a shallow copy of this extension structure. The copy will share
+ * references to the same levels/shards as the original, but will have its
+ * own lists. As all of the shards are immutable (with the exception of
+ * deletes), the copy can be restructured with reconstructions and flushes
+ * without affecting the original. The copied structure will be returned
+ * with a reference count of 0; generally you will want to immediately call
+ * take_reference() on it.
+ *
+ * NOTE: When using tagged deletes, a delete of a record in the original
+ * structure will affect the copy, so long as the copy retains a reference
+ * to the same shard as the original. This could cause synchronization
+ * problems under tagging with concurrency. Any deletes in this context will
+ * need to be forwarded to the appropriate structures manually.
+ */
+ ExtensionStructure<R, S, Q, L> *copy() {
+ auto new_struct = new ExtensionStructure<R, S, Q, L>(m_buffer_size, m_scale_factor,
+ m_max_delete_prop);
+ for (size_t i=0; i<m_levels.size(); i++) {
+ new_struct->m_levels.push_back(m_levels[i]->clone());
+ }
+
+ new_struct->m_refcnt = 0;
+
+ return new_struct;
+ }
+
+ /*
+ * Search for a record matching the argument and mark it deleted by
+ * setting the delete bit in its wrapped header. Returns 1 if a matching
+ * record was found and deleted, and 0 if a matching record was not found.
+ *
+ * This function will stop after finding the first matching record. It is
+ * assumed that no duplicate records exist. In the case of duplicates, this
+ * function will still "work", but in the sense of "delete first match".
+ */
+ int tagged_delete(const R &rec) {
+ for (auto level : m_levels) {
+ if (level && level->delete_record(rec)) {
+ return 1;
+ }
+ }
+
+ /*
+ * If the record to be erased wasn't found, return 0. The
+ * DynamicExtension itself will then search the active
+ * Buffers.
+ */
+ return 0;
+ }
+
+ /*
+ * Flush a buffer into the extension structure, performing any necessary
+ * reconstructions to free up room in L0.
+ *
+ * FIXME: arguably, this should be a method attached to the buffer that
+ * takes a structure as input.
+ */
+ inline bool flush_buffer(BuffView buffer) {
+ assert(can_reconstruct_with(0, buffer.get_record_count()));
+
+ flush_buffer_into_l0(std::move(buffer));
+
+ return true;
+ }
+
+ /*
+ * Return the total number of records (including tombstones) within all
+ * of the levels of the structure.
+ */
+ size_t get_record_count() {
+ size_t cnt = 0;
+
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) cnt += m_levels[i]->get_record_count();
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Return the total number of tombstones contained within all of the
+ * levels of the structure.
+ */
+ size_t get_tombstone_count() {
+ size_t cnt = 0;
+
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count();
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Return the number of levels within the structure. Note that not
+ * all of these levels are necessarily populated.
+ */
+ size_t get_height() {
+ return m_levels.size();
+ }
+
+ /*
+ * Return the amount of memory (in bytes) used by the shards within the
+ * structure for storing the primary data structure and raw data.
+ */
+ size_t get_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) cnt += m_levels[i]->get_memory_usage();
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Return the amount of memory (in bytes) used by the shards within the
+ * structure for storing auxiliary data structures. This total does not
+ * include memory used for the main data structure, or raw data.
+ */
+ size_t get_aux_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) {
+ cnt += m_levels[i]->get_aux_memory_usage();
+ }
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Validate that no level in the structure exceeds its maximum tombstone
+ * capacity. This is used to trigger preemptive compactions at the end of
+ * the reconstruction process.
+ */
+ bool validate_tombstone_proportion() {
+ long double ts_prop;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i]) {
+ ts_prop = (long double)m_levels[i]->get_tombstone_count() /
+ (long double)calc_level_record_capacity(i);
+ if (ts_prop > (long double)m_max_delete_prop) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ bool validate_tombstone_proportion(level_index level) {
+ long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level);
+ return ts_prop <= (long double) m_max_delete_prop;
+ }
+
+ /*
+ * Return a reference to the underlying vector of levels within the
+ * structure.
+ */
+ std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> &get_levels() {
+ return m_levels;
+ }
+
+ std::vector<ReconstructionTask> get_compaction_tasks() {
+ std::vector<ReconstructionTask> tasks;
+
+ /* if the tombstone/delete invariant is satisfied, no need for compactions */
+ if (validate_tombstone_proportion()) {
+ return tasks;
+ }
+
+ /* locate the first level to violate the invariant */
+ level_index violation_idx = -1;
+ for (level_index i=0; i<m_levels.size(); i++) {
+ if (!validate_tombstone_proportion(i)) {
+ violation_idx = i;
+ break;
+ }
+ }
+
+ assert(violation_idx != -1);
+
+ level_index base_level = find_reconstruction_target(violation_idx);
+ if (base_level == -1) {
+ base_level = grow();
+ }
+
+ for (level_index i=base_level; i>0; i--) {
+ ReconstructionTask task = {i-1, i};
+
+ /*
+ * The amount of storage required for the reconstruction accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i - 1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_reconstruct_with(i, reccnt)) {
+ reccnt += m_levels[i]->get_record_count();
+ }
+ }
+ //task.m_size = 2* reccnt * sizeof(R);
+
+ tasks.push_back(task);
+ }
+
+ return tasks;
+ }
+
+ /*
+ *
+ */
+ std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt) {
+ std::vector<ReconstructionTask> reconstructions;
+
+ /*
+ * The buffer flush is not included so if that can be done without any
+ * other change, just return an empty list.
+ */
+ if (can_reconstruct_with(0, buffer_reccnt)) {
+ return std::move(reconstructions);
+ }
+
+ level_index base_level = find_reconstruction_target(0);
+ if (base_level == -1) {
+ base_level = grow();
+ }
+
+ for (level_index i=base_level; i>0; i--) {
+ ReconstructionTask task = {i-1, i};
+
+ /*
+ * The amount of storage required for the reconstruction accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i-1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_reconstruct_with(i, reccnt)) {
+ reccnt += m_levels[i]->get_record_count();
+ }
+ }
+ //task.m_size = 2* reccnt * sizeof(R);
+
+ reconstructions.push_back(task);
+ }
+
+ return std::move(reconstructions);
+ }
+
+
+ /*
+ *
+ */
+ std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level) {
+ std::vector<ReconstructionTask> reconstructions;
+
+ level_index base_level = find_reconstruction_target(source_level);
+ if (base_level == -1) {
+ base_level = grow();
+ }
+
+ for (level_index i=base_level; i>source_level; i--) {
+ ReconstructionTask task = {i - 1, i};
+ /*
+ * The amount of storage required for the reconstruction accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i-1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_reconstruct_with(i, reccnt)) {
+ reccnt += m_levels[i]->get_record_count();
+ }
+ }
+// task.m_size = 2* reccnt * sizeof(R);
+
+ reconstructions.push_back(task);
+ }
+
+ return reconstructions;
+ }
+
+ /*
+ * Combine incoming_level with base_level and reconstruct the shard,
+ * placing it in base_level. The two levels should be sequential--i.e. no
+ * levels are skipped in the reconstruction process--otherwise the
+ * tombstone ordering invariant may be violated.
+ */
+ inline void reconstruction(level_index base_level, level_index incoming_level) {
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ /* if the base level has a shard, merge the base and incoming together to make a new one */
+ if (m_levels[base_level]->get_shard_count() > 0) {
+ m_levels[base_level] = InternalLevel<R, Shard, Q>::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get());
+ /* otherwise, we can just move the incoming to the base */
+ } else {
+ m_levels[base_level] = m_levels[incoming_level];
+ }
+ } else {
+ m_levels[base_level]->append_level(m_levels[incoming_level].get());
+ m_levels[base_level]->finalize();
+ }
+
+ /* place a new, empty level where the incoming level used to be */
+ m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
+ }
+
+ bool take_reference() {
+ m_refcnt.fetch_add(1);
+ return true;
+ }
+
+ bool release_reference() {
+ assert(m_refcnt.load() > 0);
+ m_refcnt.fetch_add(-1);
+ return true;
+ }
+
+ size_t get_reference_count() {
+ return m_refcnt.load();
+ }
+
+ std::vector<void *> get_query_states(std::vector<std::pair<ShardID, Shard*>> &shards, void *parms) {
+ std::vector<void*> states;
+
+ for (auto &level : m_levels) {
+ level->get_query_states(shards, states, parms);
+ }
+
+ return states;
+ }
+
+private:
+ size_t m_scale_factor;
+ double m_max_delete_prop;
+ size_t m_buffer_size;
+
+ std::atomic<size_t> m_refcnt;
+
+ std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels;
+
+ /*
+ * Add a new level to the structure and return its index.
+ */
+ inline level_index grow() {
+ level_index new_idx = m_levels.size();
+ size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+
+ m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)));
+ return new_idx;
+ }
+
+ /*
+ * Find the first level below the level indicated by idx that
+ * is capable of sustaining a reconstruction and return its
+ * level index. If no such level exists, returns -1. Also
+ * returns -1 if idx==0, and no such level exists, to simplify
+ * the logic of the first buffer flush.
+ */
+ inline level_index find_reconstruction_target(level_index idx) {
+
+ if (idx == 0 && m_levels.size() == 0) return -1;
+
+ size_t incoming_rec_cnt = get_level_record_count(idx);
+ for (level_index i=idx+1; i<m_levels.size(); i++) {
+ if (can_reconstruct_with(i, incoming_rec_cnt)) {
+ return i;
+ }
+
+ incoming_rec_cnt = get_level_record_count(i);
+ }
+
+ return -1;
+ }
+
+ inline void flush_buffer_into_l0(BuffView buffer) {
+ assert(m_levels[0]);
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ // FIXME: Kludgey implementation due to interface constraints.
+ auto old_level = m_levels[0].get();
+ auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
+ temp_level->append_buffer(std::move(buffer));
+
+ if (old_level->get_shard_count() > 0) {
+ m_levels[0] = InternalLevel<R, Shard, Q>::reconstruction(old_level, temp_level);
+ delete temp_level;
+ } else {
+ m_levels[0] = std::shared_ptr<InternalLevel<R, Shard, Q>>(temp_level);
+ }
+ } else {
+ m_levels[0]->append_buffer(std::move(buffer));
+ }
+ }
+
+ /*
+ * Mark a given memory level as no-longer in use by the tree. For now this
+ * will just free the level. In future, this will be more complex as the
+ * level may not be able to immediately be deleted, depending upon who
+ * else is using it.
+ */
+ inline void mark_as_unused(std::shared_ptr<InternalLevel<R, Shard, Q>> level) {
+ level.reset();
+ }
+
+ /*
+ * Assume that level "0" should be larger than the buffer. The buffer
+ * itself is index -1, which should return simply the buffer capacity.
+ */
+ inline size_t calc_level_record_capacity(level_index idx) {
+ return m_buffer_size * pow(m_scale_factor, idx+1);
+ }
+
+ /*
+ * Returns the number of records present on a specified level.
+ */
+ inline size_t get_level_record_count(level_index idx) {
+ return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0;
+ }
+
+ /*
+ * Determines if a level can sustain a reconstruction with incoming_rec_cnt
+ * additional records without exceeding its capacity.
+ */
+ inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) {
+ if (idx >= m_levels.size() || !m_levels[idx]) {
+ return false;
+ }
+
+ if (L == LayoutPolicy::LEVELING) {
+ return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx);
+ } else {
+ return m_levels[idx]->get_shard_count() < m_scale_factor;
+ }
+
+ /* unreachable */
+ assert(true);
+ }
+};
+
+}
+
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
new file mode 100644
index 0000000..db38946
--- /dev/null
+++ b/include/framework/structure/InternalLevel.h
@@ -0,0 +1,271 @@
+/*
+ * include/framework/structure/InternalLevel.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * The word `Internal` in this class's name refers to memory. The current
+ * model, inherited from the framework in Practical Dynamic Extension for
+ * Sampling Indexes, would use a different ExternalLevel for shards stored
+ * on external storage. This is a distinction that can probably be avoided
+ * with some more thought being put into interface design.
+ *
+ */
+#pragma once
+
+#include <vector>
+#include <memory>
+
+#include "util/types.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/BufferView.h"
+
+namespace de {
+template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q>
+class InternalLevel;
+
+
+
+template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q>
+class InternalLevel {
+ typedef S Shard;
+ typedef BufferView<R> BuffView;
+public:
+ InternalLevel(ssize_t level_no, size_t shard_cap)
+ : m_level_no(level_no)
+ , m_shard_cnt(0)
+ , m_shards(shard_cap, nullptr)
+ , m_pending_shard(nullptr)
+ {}
+
+ ~InternalLevel() {
+ delete m_pending_shard;
+ }
+
+ /*
+ * Create a new shard combining the records from base_level and new_level,
+ * and return a shared_ptr to a new level containing this shard. This is used
+ * for reconstructions under the leveling layout policy.
+ *
+ * No changes are made to the levels provided as arguments.
+ */
+ static std::shared_ptr<InternalLevel> reconstruction(InternalLevel* base_level, InternalLevel* new_level) {
+ assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
+ auto res = new InternalLevel(base_level->m_level_no, 1);
+ res->m_shard_cnt = 1;
+ std::vector<Shard *> shards = {base_level->m_shards[0].get(),
+ new_level->m_shards[0].get()};
+
+ res->m_shards[0] = std::make_shared<S>(shards);
+ return std::shared_ptr<InternalLevel>(res);
+ }
+
+ /*
+ * Create a new shard combining the records from all of
+ * the shards in level, and append this new shard into
+ * this level. This is used for reconstructions under
+ * the tiering layout policy.
+ *
+ * No changes are made to the level provided as an argument.
+ */
+ void append_level(InternalLevel* level) {
+ // FIXME: that this is happening probably means that
+ // something is going terribly wrong earlier in the
+ // reconstruction logic.
+ if (level->get_shard_count() == 0) {
+ return;
+ }
+
+ std::vector<S*> shards;
+ for (auto shard : level->m_shards) {
+ if (shard) shards.emplace_back(shard.get());
+ }
+
+ if (m_shard_cnt == m_shards.size()) {
+ m_pending_shard = new S(shards);
+ return;
+ }
+
+ auto tmp = new S(shards);
+ m_shards[m_shard_cnt] = std::shared_ptr<S>(tmp);
+
+ ++m_shard_cnt;
+ }
+
+ /*
+ * Create a new shard using the records in the
+ * provided buffer, and append this new shard
+ * into this level. This is used for buffer
+ * flushes under the tiering layout policy.
+ */
+ void append_buffer(BuffView buffer) {
+ if (m_shard_cnt == m_shards.size()) {
+ assert(m_pending_shard == nullptr);
+ m_pending_shard = new S(std::move(buffer));
+ return;
+ }
+
+ m_shards[m_shard_cnt] = std::make_shared<S>(std::move(buffer));
+ ++m_shard_cnt;
+ }
+
+ void finalize() {
+ if (m_pending_shard) {
+ for (size_t i=0; i<m_shards.size(); i++) {
+ m_shards[i] = nullptr;
+ }
+
+ m_shards[0] = std::shared_ptr<S>(m_pending_shard);
+ m_pending_shard = nullptr;
+ m_shard_cnt = 1;
+ }
+ }
+
+ /*
+ * Create a new shard containing the combined records
+ * from all shards on this level and return it.
+ *
+ * No changes are made to this level.
+ */
+ Shard *get_combined_shard() {
+ if (m_shard_cnt == 0) {
+ return nullptr;
+ }
+
+ std::vector<Shard *> shards;
+ for (auto shard : m_shards) {
+ if (shard) shards.emplace_back(shard.get());
+ }
+
+ return new S(shards);
+ }
+
+ void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ auto shard_state = Q::get_query_state(m_shards[i].get(), query_parms);
+ shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i].get()});
+ shard_states.emplace_back(shard_state);
+ }
+ }
+ }
+
+ bool check_tombstone(size_t shard_stop, const R& rec) {
+ if (m_shard_cnt == 0) return false;
+
+ for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) {
+ if (m_shards[i]) {
+ auto res = m_shards[i]->point_lookup(rec, true);
+ if (res && res->is_tombstone()) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ bool delete_record(const R &rec) {
+ if (m_shard_cnt == 0) return false;
+
+ for (size_t i = 0; i < m_shards.size(); ++i) {
+ if (m_shards[i]) {
+ auto res = m_shards[i]->point_lookup(rec);
+ if (res) {
+ res->set_delete();
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ Shard* get_shard(size_t idx) {
+ return m_shards[idx].get();
+ }
+
+ size_t get_shard_count() {
+ return m_shard_cnt;
+ }
+
+ size_t get_record_count() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ cnt += m_shards[i]->get_record_count();
+ }
+ }
+
+ return cnt;
+ }
+
+ size_t get_tombstone_count() {
+ size_t res = 0;
+ for (size_t i = 0; i < m_shard_cnt; ++i) {
+ if (m_shards[i]) {
+ res += m_shards[i]->get_tombstone_count();
+ }
+ }
+ return res;
+ }
+
+ size_t get_aux_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]){
+ cnt += m_shards[i]->get_aux_memory_usage();
+ }
+ }
+
+ return cnt;
+ }
+
+ size_t get_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ cnt += m_shards[i]->get_memory_usage();
+ }
+ }
+
+ return cnt;
+ }
+
+ double get_tombstone_prop() {
+ size_t tscnt = 0;
+ size_t reccnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ tscnt += m_shards[i]->get_tombstone_count();
+ reccnt += m_shards[i]->get_record_count();
+ }
+ }
+
+ return (double) tscnt / (double) (tscnt + reccnt);
+ }
+
+ std::shared_ptr<InternalLevel> clone() {
+ auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size());
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ new_level->m_shards[i] = m_shards[i];
+ }
+ new_level->m_shard_cnt = m_shard_cnt;
+
+ return new_level;
+ }
+
+private:
+ ssize_t m_level_no;
+
+ size_t m_shard_cnt;
+ size_t m_shard_size_cap;
+
+ std::vector<std::shared_ptr<Shard>> m_shards;
+ Shard *m_pending_shard;
+};
+
+}
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
new file mode 100644
index 0000000..415c95a
--- /dev/null
+++ b/include/framework/structure/MutableBuffer.h
@@ -0,0 +1,313 @@
+/*
+ * include/framework/structure/MutableBuffer.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * NOTE: Concerning the tombstone count. One possible approach
+ * would be to track the number of tombstones below and above the
+ * low water mark--this would be straightforward to do. Then, if we
+ * *require* that the head only advance up to the LWM, we can get a
+ * correct view on the number of tombstones in the active buffer at
+ * any point in time, and the BufferView will have a pretty good
+ * approximation as well (potentially with a few extra if new inserts
+ * happen between when the tail pointer and tombstone count are fetched)
+ *
+ */
+#pragma once
+
+#include <cstdlib>
+#include <atomic>
+#include <cassert>
+#include <immintrin.h>
+
+#include "psu-util/alignment.h"
+#include "util/bf_config.h"
+#include "psu-ds/BloomFilter.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/BufferView.h"
+
+using psudb::CACHELINE_SIZE;
+
+namespace de {
+
+template <RecordInterface R>
+class MutableBuffer {
+ friend class BufferView<R>;
+
+ struct buffer_head {
+ size_t head_idx;
+ size_t refcnt;
+ };
+
+public:
+ MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0)
+ : m_lwm(low_watermark)
+ , m_hwm(high_watermark)
+ , m_cap((capacity == 0) ? 2 * high_watermark : capacity)
+ , m_tail(0)
+ , m_head({0, 0})
+ , m_old_head({high_watermark, 0})
+ , m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)))
+ , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS))
+ , m_tscnt(0)
+ , m_old_tscnt(0)
+ , m_active_head_advance(false)
+ {
+ assert(m_cap > m_hwm);
+ assert(m_hwm > m_lwm);
+ }
+
+ ~MutableBuffer() {
+ free(m_data);
+ delete m_tombstone_filter;
+ }
+
+ int append(const R &rec, bool tombstone=false) {
+ int32_t tail = 0;
+ if ((tail = try_advance_tail()) == -1) {
+ return 0;
+ }
+
+ Wrapped<R> wrec;
+ wrec.rec = rec;
+ wrec.header = 0;
+ if (tombstone) wrec.set_tombstone();
+
+ size_t pos = tail % m_cap;
+
+ m_data[pos] = wrec;
+ m_data[pos].header |= (pos << 2);
+
+ if (tombstone) {
+ m_tscnt.fetch_add(1);
+ if (m_tombstone_filter) m_tombstone_filter->insert(rec);
+ }
+
+ return 1;
+ }
+
+ bool truncate() {
+ m_tscnt.store(0);
+ m_tail.store(0);
+ if (m_tombstone_filter) m_tombstone_filter->clear();
+
+ return true;
+ }
+
+ size_t get_record_count() {
+ return m_tail.load() - m_head.load().head_idx;
+ }
+
+ size_t get_capacity() {
+ return m_cap;
+ }
+
+ bool is_full() {
+ return get_record_count() >= m_hwm;
+ }
+
+ bool is_at_low_watermark() {
+ return get_record_count() >= m_lwm;
+ }
+
+ size_t get_tombstone_count() {
+ return m_tscnt.load();
+ }
+
+ bool delete_record(const R& rec) {
+ return get_buffer_view().delete_record(rec);
+ }
+
+ bool check_tombstone(const R& rec) {
+ return get_buffer_view().check_tombstone(rec);
+ }
+
+ size_t get_memory_usage() {
+ return m_cap * sizeof(Wrapped<R>);
+ }
+
+ size_t get_aux_memory_usage() {
+ return m_tombstone_filter->get_memory_usage();
+ }
+
+ BufferView<R> get_buffer_view(size_t target_head) {
+ size_t head = get_head(target_head);
+ auto f = std::bind(release_head_reference, (void *) this, head);
+
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
+ }
+
+ BufferView<R> get_buffer_view() {
+ size_t head = get_head(m_head.load().head_idx);
+ auto f = std::bind(release_head_reference, (void *) this, head);
+
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
+ }
+
+ /*
+ * Advance the buffer following a reconstruction. Move current
+ * head and head_refcnt into old_head and old_head_refcnt, then
+ * assign new_head to old_head.
+ */
+ bool advance_head(size_t new_head) {
+ assert(new_head > m_head.load().head_idx);
+ assert(new_head <= m_tail.load());
+
+ /* refuse to advance head while there is an old with one references */
+ if (m_old_head.load().refcnt > 0) {
+ //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n");
+ return false;
+ }
+
+ m_active_head_advance.store(true);
+
+ buffer_head new_hd = {new_head, 0};
+ buffer_head cur_hd;
+
+ /* replace current head with new head */
+ do {
+ cur_hd = m_head.load();
+ } while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+
+ /* move the current head into the old head */
+ m_old_head.store(cur_hd);
+
+ m_active_head_advance.store(false);
+ return true;
+ }
+
+ /*
+ * FIXME: If target_head does not match *either* the old_head or the
+ * current_head, this routine will loop infinitely.
+ */
+ size_t get_head(size_t target_head) {
+ buffer_head cur_hd, new_hd;
+ bool head_acquired = false;
+
+ do {
+ if (m_old_head.load().head_idx == target_head) {
+ cur_hd = m_old_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else if (m_head.load().head_idx == target_head){
+ cur_hd = m_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ } while(!head_acquired);
+
+ return new_hd.head_idx;
+ }
+
+ void set_low_watermark(size_t lwm) {
+ assert(lwm < m_hwm);
+ m_lwm = lwm;
+ }
+
+ size_t get_low_watermark() {
+ return m_lwm;
+ }
+
+ void set_high_watermark(size_t hwm) {
+ assert(hwm > m_lwm);
+ assert(hwm < m_cap);
+ m_hwm = hwm;
+ }
+
+ size_t get_high_watermark() {
+ return m_hwm;
+ }
+
+ size_t get_tail() {
+ return m_tail.load();
+ }
+
+ /*
+ * Note: this returns the available physical storage capacity,
+ * *not* now many more records can be inserted before the
+ * HWM is reached. It considers the old_head to be "free"
+ * when it has no remaining references. This should be true,
+ * but a buggy framework implementation may violate the
+ * assumption.
+ */
+ size_t get_available_capacity() {
+ if (m_old_head.load().refcnt == 0) {
+ return m_cap - (m_tail.load() - m_head.load().head_idx);
+ }
+
+ return m_cap - (m_tail.load() - m_old_head.load().head_idx);
+ }
+
+private:
+ int64_t try_advance_tail() {
+ size_t old_value = m_tail.load();
+
+ /* if full, fail to advance the tail */
+ if (old_value - m_head.load().head_idx >= m_hwm) {
+ return -1;
+ }
+
+ while (!m_tail.compare_exchange_strong(old_value, old_value+1)) {
+ /* if full, stop trying and fail to advance the tail */
+ if (m_tail.load() >= m_hwm) {
+ return -1;
+ }
+
+ _mm_pause();
+ }
+
+ return old_value;
+ }
+
+ size_t to_idx(size_t i, size_t head) {
+ return (head + i) % m_cap;
+ }
+
+ static void release_head_reference(void *buff, size_t head) {
+ MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff;
+
+ buffer_head cur_hd, new_hd;
+ do {
+ if (buffer->m_old_head.load().head_idx == head) {
+ cur_hd = buffer->m_old_head;
+ if (cur_hd.refcnt == 0) continue;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
+ if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
+ break;
+ }
+ } else {
+ cur_hd = buffer->m_head;
+ if (cur_hd.refcnt == 0) continue;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
+
+ if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
+ break;
+ }
+ }
+ _mm_pause();
+ } while(true);
+ }
+
+ size_t m_lwm;
+ size_t m_hwm;
+ size_t m_cap;
+
+ alignas(64) std::atomic<size_t> m_tail;
+
+ alignas(64) std::atomic<buffer_head> m_head;
+ alignas(64) std::atomic<buffer_head> m_old_head;
+
+ Wrapped<R>* m_data;
+ psudb::BloomFilter<R>* m_tombstone_filter;
+ alignas(64) std::atomic<size_t> m_tscnt;
+ size_t m_old_tscnt;
+
+ alignas(64) std::atomic<bool> m_active_head_advance;
+};
+
+}