summaryrefslogtreecommitdiffstats
path: root/include/framework/structure
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/structure')
-rw-r--r--include/framework/structure/ExtensionStructure.h428
-rw-r--r--include/framework/structure/InternalLevel.h258
-rw-r--r--include/framework/structure/MutableBuffer.h242
3 files changed, 928 insertions, 0 deletions
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
new file mode 100644
index 0000000..920e1c3
--- /dev/null
+++ b/include/framework/structure/ExtensionStructure.h
@@ -0,0 +1,428 @@
+/*
+ * include/framework/ExtensionStructure.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <atomic>
+#include <numeric>
+#include <cstdio>
+#include <vector>
+
+#include "framework/structure/MutableBuffer.h"
+#include "framework/structure/InternalLevel.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+
+#include "framework/util/Configuration.h"
+#include "framework/scheduling/Task.h"
+
+#include "psu-util/timer.h"
+#include "psu-ds/Alias.h"
+
+namespace de {
+
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING>
+class ExtensionStructure {
+ typedef S Shard;
+ typedef MutableBuffer<R> Buffer;
+
+public:
+ ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop)
+ : m_scale_factor(scale_factor)
+ , m_max_delete_prop(max_delete_prop)
+ , m_buffer_size(buffer_size)
+ {}
+
+ ~ExtensionStructure() = default;
+
+ /*
+ * Create a shallow copy of this extension structure. The copy will share references to the
+ * same levels/shards as the original, but will have its own lists. As all of the shards are
+ * immutable (with the exception of deletes), the copy can be restructured with merges, etc.,
+ * without affecting the original.
+ *
+ * NOTE: When using tagged deletes, a delete of a record in the original structure will affect
+ * the copy, so long as the copy retains a reference to the same shard as the original. This could
+ * cause synchronization problems under tagging with concurrency. Any deletes in this context will
+ * need to be forwarded to the appropriate structures manually.
+ */
+ ExtensionStructure<R, S, Q, L> *copy() {
+ auto new_struct = new ExtensionStructure<R, S, Q, L>(m_scale_factor, m_max_delete_prop, m_buffer_size);
+ for (size_t i=0; i<m_levels.size(); i++) {
+ new_struct->m_levels.push_back(m_levels[i]->clone());
+ }
+
+ return new_struct;
+ }
+
+ /*
+ * Search for a record matching the argument and mark it deleted by
+ * setting the delete bit in its wrapped header. Returns 1 if a matching
+ * record was found and deleted, and 0 if a matching record was not found.
+ *
+ * This function will stop after finding the first matching record. It is assumed
+ * that no duplicate records exist. In the case of duplicates, this function will
+ * still "work", but in the sense of "delete first match".
+ */
+ int tagged_delete(const R &rec) {
+ for (auto level : m_levels) {
+ if (level && level->delete_record(rec)) {
+ return 1;
+ }
+ }
+
+ /*
+ * If the record to be erased wasn't found, return 0. The
+ * DynamicExtension itself will then search the active
+ * Buffers.
+ */
+ return 0;
+ }
+
+ /*
+ * Merge the memory table down into the tree, completing any required other
+ * merges to make room for it.
+ */
+ inline bool merge_buffer(Buffer *buffer) {
+ assert(can_merge_with(0, buffer->get_record_count()));
+
+ merge_buffer_into_l0(buffer);
+ buffer->truncate();
+
+ return true;
+ }
+
+ /*
+ * Return the total number of records (including tombstones) within all
+ * of the levels of the structure.
+ */
+ size_t get_record_count() {
+ size_t cnt = 0;
+
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) cnt += m_levels[i]->get_record_count();
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Return the total number of tombstones contained within all of the
+ * levels of the structure.
+ */
+ size_t get_tombstone_cnt() {
+ size_t cnt = 0;
+
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count();
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Return the number of levels within the structure. Note that not
+ * all of these levels are necessarily populated.
+ */
+ size_t get_height() {
+ return m_levels.size();
+ }
+
+ /*
+ * Return the amount of memory (in bytes) used by the shards within the
+ * structure for storing the primary data structure and raw data.
+ */
+ size_t get_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) cnt += m_levels[i]->get_memory_usage();
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Return the amount of memory (in bytes) used by the shards within the
+ * structure for storing auxiliary data structures. This total does not
+ * include memory used for the main data structure, or raw data.
+ */
+ size_t get_aux_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) {
+ cnt += m_levels[i]->get_aux_memory_usage();
+ }
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Validate that no level in the structure exceeds its maximum tombstone capacity. This is
+ * used to trigger preemptive compactions at the end of the merge process.
+ */
+ bool validate_tombstone_proportion() {
+ long double ts_prop;
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) {
+ ts_prop = (long double) m_levels[i]->get_tombstone_count() / (long double) calc_level_record_capacity(i);
+ if (ts_prop > (long double) m_max_delete_prop) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ bool validate_tombstone_proportion(level_index level) {
+ long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level);
+ return ts_prop <= (long double) m_max_delete_prop;
+ }
+
+ /*
+ * Return a reference to the underlying vector of levels within the
+ * structure.
+ */
+ std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> &get_levels() {
+ return m_levels;
+ }
+
+ /*
+ *
+ */
+ std::vector<MergeTask> get_merge_tasks(size_t buffer_reccnt) {
+ std::vector<MergeTask> merges;
+
+ /*
+ * The buffer -> L0 merge task is not included so if that
+ * can be done without any other change, just return an
+ * empty list.
+ */
+ if (can_merge_with(0, buffer_reccnt)) {
+ return std::move(merges);
+ }
+
+ level_index merge_base_level = find_mergable_level(0);
+ if (merge_base_level == -1) {
+ merge_base_level = grow();
+ }
+
+ for (level_index i=merge_base_level; i>0; i--) {
+ MergeTask task;
+ task.m_source_level = i - 1;
+ task.m_target_level = i;
+ task.m_type = TaskType::MERGE;
+
+ /*
+ * The amount of storage required for the merge accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i-1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_merge_with(i, reccnt)) {
+ reccnt += m_levels[i]->get_record_count();
+ }
+ }
+ task.m_size = 2* reccnt * sizeof(R);
+
+ merges.push_back(task);
+ }
+
+ return std::move(merges);
+ }
+
+
+ /*
+ *
+ */
+ std::vector<MergeTask> get_merge_tasks_from_level(size_t source_level) {
+ std::vector<MergeTask> merges;
+
+ level_index merge_base_level = find_mergable_level(source_level);
+ if (merge_base_level == -1) {
+ merge_base_level = grow();
+ }
+
+ for (level_index i=merge_base_level; i>source_level; i--) {
+ MergeTask task;
+ task.m_source_level = i - 1;
+ task.m_target_level = i;
+
+ /*
+ * The amount of storage required for the merge accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i-1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_merge_with(i, reccnt)) {
+ reccnt += m_levels[i]->get_record_count();
+ }
+ }
+ task.m_size = 2* reccnt * sizeof(R);
+
+ merges.push_back(task);
+ }
+
+ return std::move(merges);
+ }
+
+ /*
+ * Merge the level specified by incoming level into the level specified
+ * by base level. The two levels should be sequential--i.e. no levels
+ * are skipped in the merge process--otherwise the tombstone ordering
+ * invariant may be violated by the merge operation.
+ */
+ inline void merge_levels(level_index base_level, level_index incoming_level) {
+ // merging two memory levels
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ auto tmp = m_levels[base_level];
+ m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get());
+ } else {
+ m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get());
+ m_levels[base_level]->finalize();
+ }
+
+ m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
+ }
+
+
+private:
+ size_t m_scale_factor;
+ double m_max_delete_prop;
+ size_t m_buffer_size;
+
+ std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels;
+
+ /*
+ * Add a new level to the LSM Tree and return that level's index. Will
+ * automatically determine whether the level should be on memory or on disk,
+ * and act appropriately.
+ */
+ inline level_index grow() {
+ level_index new_idx = m_levels.size();
+ size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+
+ m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)));
+ return new_idx;
+ }
+
+ /*
+ * Find the first level below the level indicated by idx that
+ * is capable of sustaining a merge operation and return its
+ * level index. If no such level exists, returns -1. Also
+ * returns -1 if idx==0, and no such level exists, to skimplify
+ * the logic of the first merge.
+ */
+ inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) {
+
+ if (idx == 0 && m_levels.size() == 0) return -1;
+
+ bool level_found = false;
+ bool disk_level;
+ level_index merge_level_idx;
+
+ size_t incoming_rec_cnt = get_level_record_count(idx, buffer);
+ for (level_index i=idx+1; i<m_levels.size(); i++) {
+ if (can_merge_with(i, incoming_rec_cnt)) {
+ return i;
+ }
+
+ incoming_rec_cnt = get_level_record_count(i);
+ }
+
+ return -1;
+ }
+
+ inline void merge_buffer_into_l0(Buffer *buffer) {
+ assert(m_levels[0]);
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ // FIXME: Kludgey implementation due to interface constraints.
+ auto old_level = m_levels[0].get();
+ auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
+ temp_level->append_buffer(buffer);
+ auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level);
+
+ m_levels[0] = new_level;
+ delete temp_level;
+ } else {
+ m_levels[0]->append_buffer(buffer);
+ }
+ }
+
+ /*
+ * Mark a given memory level as no-longer in use by the tree. For now this
+ * will just free the level. In future, this will be more complex as the
+ * level may not be able to immediately be deleted, depending upon who
+ * else is using it.
+ */
+ inline void mark_as_unused(std::shared_ptr<InternalLevel<R, Shard, Q>> level) {
+ level.reset();
+ }
+
+ /*
+ * Assume that level "0" should be larger than the buffer. The buffer
+ * itself is index -1, which should return simply the buffer capacity.
+ */
+ inline size_t calc_level_record_capacity(level_index idx) {
+ return m_buffer_size * pow(m_scale_factor, idx+1);
+ }
+
+ /*
+ * Returns the actual number of records present on a specified level. An
+ * index value of -1 indicates the memory table. Can optionally pass in
+ * a pointer to the memory table to use, if desired. Otherwise, there are
+ * no guarantees about which buffer will be accessed if level_index is -1.
+ */
+ inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) {
+ if (buffer) {
+ return buffer->get_record_count();
+ }
+
+ return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0;
+ }
+
+ /*
+ * Determines if the specific level can merge with another record containing
+ * incoming_rec_cnt number of records. The provided level index should be
+ * non-negative (i.e., not refer to the buffer) and will be automatically
+ * translated into the appropriate index into either the disk or memory level
+ * vector.
+ */
+ inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) {
+ if (idx>= m_levels.size() || !m_levels[idx]) {
+ return false;
+ }
+
+ if (L == LayoutPolicy::LEVELING) {
+ return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx);
+ } else {
+ return m_levels[idx]->get_shard_count() < m_scale_factor;
+ }
+
+ /* unreachable */
+ assert(true);
+ }
+};
+
+}
+
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
new file mode 100644
index 0000000..b9230f4
--- /dev/null
+++ b/include/framework/structure/InternalLevel.h
@@ -0,0 +1,258 @@
+/*
+ * include/framework/InternalLevel.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <vector>
+#include <memory>
+
+#include "util/types.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/MutableBuffer.h"
+
+namespace de {
+template <RecordInterface R, ShardInterface S, QueryInterface Q>
+class InternalLevel;
+
+
+
+template <RecordInterface R, ShardInterface S, QueryInterface Q>
+class InternalLevel {
+ typedef S Shard;
+ typedef MutableBuffer<R> Buffer;
+public:
+ InternalLevel(ssize_t level_no, size_t shard_cap)
+ : m_level_no(level_no)
+ , m_shard_cnt(0)
+ , m_shards(shard_cap, nullptr)
+ , m_owns(shard_cap, true)
+ , m_pending_shard(nullptr)
+ {}
+
+ // Create a new memory level sharing the shards and repurposing it as previous level_no + 1
+ // WARNING: for leveling only.
+ InternalLevel(InternalLevel* level)
+ : m_level_no(level->m_level_no + 1)
+ , m_shard_cnt(level->m_shard_cnt)
+ , m_shards(level->m_shards.size(), nullptr)
+ , m_owns(level->m_owns.size(), true)
+ , m_pending_shard(nullptr)
+ {
+ assert(m_shard_cnt == 1 && m_shards.size() == 1);
+
+ for (size_t i=0; i<m_shards.size(); i++) {
+ level->m_owns[i] = false;
+ m_shards[i] = level->m_shards[i];
+ }
+ }
+
+ ~InternalLevel() {
+ for (size_t i=0; i<m_shards.size(); i++) {
+ if (m_owns[i]) delete m_shards[i];
+ }
+
+ delete m_pending_shard;
+ }
+
+ // WARNING: for leveling only.
+ // assuming the base level is the level new level is merging into. (base_level is larger.)
+ static std::shared_ptr<InternalLevel> merge_levels(InternalLevel* base_level, InternalLevel* new_level) {
+ assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
+ auto res = new InternalLevel(base_level->m_level_no, 1);
+ res->m_shard_cnt = 1;
+ Shard* shards[2];
+ shards[0] = base_level->m_shards[0];
+ shards[1] = new_level->m_shards[0];
+
+ res->m_shards[0] = new S(shards, 2);
+ return std::shared_ptr<InternalLevel>(res);
+ }
+
+ void append_buffer(Buffer* buffer) {
+ if (m_shard_cnt == m_shards.size()) {
+ assert(m_pending_shard == nullptr);
+ m_pending_shard = new S(buffer);
+ return;
+ }
+
+ m_shards[m_shard_cnt] = new S(buffer);
+ m_owns[m_shard_cnt] = true;
+ ++m_shard_cnt;
+ }
+
+ void append_merged_shards(InternalLevel* level) {
+ if (m_shard_cnt == m_shards.size()) {
+ m_pending_shard = new S(level->m_shards.data(), level->m_shard_cnt);
+ return;
+ }
+
+ m_shards[m_shard_cnt] = new S(level->m_shards.data(), level->m_shard_cnt);
+ m_owns[m_shard_cnt] = true;
+
+ ++m_shard_cnt;
+ }
+
+
+ void finalize() {
+ if (m_pending_shard) {
+ for (size_t i=0; i<m_shards.size(); i++) {
+ if (m_owns[i]) {
+ delete m_shards[i];
+ m_shards[i] = nullptr;
+ m_owns[i] = false;
+ }
+ }
+
+ m_shards[0] = m_pending_shard;
+ m_owns[0] = true;
+ m_pending_shard = nullptr;
+ m_shard_cnt = 1;
+ }
+ }
+
+ Shard *get_merged_shard() {
+ if (m_shard_cnt == 0) {
+ return nullptr;
+ }
+
+ Shard *shards[m_shard_cnt];
+
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ shards[i] = m_shards[i];
+ }
+
+ return new S(shards, m_shard_cnt);
+ }
+
+ // Append the sample range in-order.....
+ void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ auto shard_state = Q::get_query_state(m_shards[i], query_parms);
+ shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i]});
+ shard_states.emplace_back(shard_state);
+ }
+ }
+ }
+
+ bool check_tombstone(size_t shard_stop, const R& rec) {
+ if (m_shard_cnt == 0) return false;
+
+ for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) {
+ if (m_shards[i]) {
+ auto res = m_shards[i]->point_lookup(rec, true);
+ if (res && res->is_tombstone()) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ bool delete_record(const R &rec) {
+ if (m_shard_cnt == 0) return false;
+
+ for (size_t i = 0; i < m_shards.size(); ++i) {
+ if (m_shards[i]) {
+ auto res = m_shards[i]->point_lookup(rec);
+ if (res) {
+ res->set_delete();
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ Shard* get_shard(size_t idx) {
+ return m_shards[idx];
+ }
+
+ size_t get_shard_count() {
+ return m_shard_cnt;
+ }
+
+ size_t get_record_count() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ cnt += m_shards[i]->get_record_count();
+ }
+
+ return cnt;
+ }
+
+ size_t get_tombstone_count() {
+ size_t res = 0;
+ for (size_t i = 0; i < m_shard_cnt; ++i) {
+ res += m_shards[i]->get_tombstone_count();
+ }
+ return res;
+ }
+
+ size_t get_aux_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ cnt += m_shards[i]->get_aux_memory_usage();
+ }
+
+ return cnt;
+ }
+
+ size_t get_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ cnt += m_shards[i]->get_memory_usage();
+ }
+ }
+
+ return cnt;
+ }
+
+ double get_tombstone_prop() {
+ size_t tscnt = 0;
+ size_t reccnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ tscnt += m_shards[i]->get_tombstone_count();
+ reccnt += (*m_shards[i])->get_record_count();
+ }
+ }
+
+ return (double) tscnt / (double) (tscnt + reccnt);
+ }
+
+private:
+ ssize_t m_level_no;
+
+ size_t m_shard_cnt;
+ size_t m_shard_size_cap;
+
+ std::vector<Shard*> m_shards;
+
+ Shard *m_pending_shard;
+
+ std::vector<bool> m_owns;
+
+ std::shared_ptr<InternalLevel> clone() {
+ auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size());
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ new_level->m_shards[i] = m_shards[i];
+ new_level->m_owns[i] = true;
+ m_owns[i] = false;
+ }
+
+ return new_level;
+ }
+};
+
+}
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
new file mode 100644
index 0000000..9f12175
--- /dev/null
+++ b/include/framework/structure/MutableBuffer.h
@@ -0,0 +1,242 @@
+/*
+ * include/framework/MutableBuffer.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <cstdlib>
+#include <atomic>
+#include <condition_variable>
+#include <cassert>
+#include <numeric>
+#include <algorithm>
+#include <type_traits>
+
+#include "psu-util/alignment.h"
+#include "util/bf_config.h"
+#include "psu-ds/BloomFilter.h"
+#include "psu-ds/Alias.h"
+#include "psu-util/timer.h"
+#include "framework/interface/Record.h"
+
+using psudb::CACHELINE_SIZE;
+
+namespace de {
+
+template <RecordInterface R>
+class MutableBuffer {
+public:
+ MutableBuffer(size_t capacity, size_t max_tombstone_cap)
+ : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
+ , m_tombstonecnt(0), m_weight(0), m_max_weight(0) {
+ m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
+ m_merge_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
+ m_tombstone_filter = nullptr;
+ if (max_tombstone_cap > 0) {
+ m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS);
+ }
+
+ m_refcnt.store(0);
+ m_deferred_truncate.store(false);
+ m_merging.store(false);
+ }
+
+ ~MutableBuffer() {
+ assert(m_refcnt.load() == 0);
+ assert(m_merging.load() == false);
+
+ if (m_data) free(m_data);
+ if (m_tombstone_filter) delete m_tombstone_filter;
+ if (m_merge_data) free(m_merge_data);
+ }
+
+ template <typename R_ = R>
+ int append(const R &rec, bool tombstone=false) {
+ if (tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0;
+
+ int32_t pos = 0;
+ if ((pos = try_advance_tail()) == -1) return 0;
+
+ Wrapped<R> wrec;
+ wrec.rec = rec;
+ wrec.header = 0;
+ if (tombstone) wrec.set_tombstone();
+
+ m_data[pos] = wrec;
+ m_data[pos].header |= (pos << 2);
+
+ if (tombstone) {
+ m_tombstonecnt.fetch_add(1);
+ if (m_tombstone_filter) m_tombstone_filter->insert(rec);
+ }
+
+ if constexpr (WeightedRecordInterface<R_>) {
+ m_weight.fetch_add(rec.weight);
+ double old = m_max_weight.load();
+ while (old < rec.weight) {
+ m_max_weight.compare_exchange_strong(old, rec.weight);
+ old = m_max_weight.load();
+ }
+ } else {
+ m_weight.fetch_add(1);
+ }
+
+ return 1;
+ }
+
+ bool truncate() {
+ m_tombstonecnt.store(0);
+ m_reccnt.store(0);
+ m_weight.store(0);
+ m_max_weight.store(0);
+ if (m_tombstone_filter) m_tombstone_filter->clear();
+
+ return true;
+ }
+
+ size_t get_record_count() {
+ return m_reccnt;
+ }
+
+ size_t get_capacity() {
+ return m_cap;
+ }
+
+ bool is_full() {
+ return m_reccnt == m_cap;
+ }
+
+ size_t get_tombstone_count() {
+ return m_tombstonecnt.load();
+ }
+
+ bool delete_record(const R& rec) {
+ auto offset = 0;
+ while (offset < m_reccnt.load()) {
+ if (m_data[offset].rec == rec) {
+ m_data[offset].set_delete();
+ return true;
+ }
+ offset++;
+ }
+
+ return false;
+ }
+
+ bool check_tombstone(const R& rec) {
+ if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false;
+
+ auto offset = 0;
+ while (offset < m_reccnt.load()) {
+ if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) {
+ return true;
+ }
+ offset++;;
+ }
+ return false;
+ }
+
+ size_t get_memory_usage() {
+ return m_cap * sizeof(R);
+ }
+
+ size_t get_aux_memory_usage() {
+ return m_tombstone_filter->get_memory_usage();
+ }
+
+ size_t get_tombstone_capacity() {
+ return m_tombstone_cap;
+ }
+
+ double get_total_weight() {
+ return m_weight.load();
+ }
+
+ Wrapped<R> *get_data() {
+ return m_data;
+ }
+
+ double get_max_weight() {
+ return m_max_weight;
+ }
+
+ bool start_merge() {
+ if (m_merge_lock.try_lock()) {
+ /* there cannot already been an active merge */
+ if (m_merging.load()) {
+ m_merge_lock.unlock();
+ return false;
+ }
+
+ m_merging.store(true);
+ memcpy(m_merge_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load());
+ return true;
+ }
+
+ /* lock could not be obtained */
+ return false;
+ }
+
+ bool finish_merge() {
+ m_merge_lock.unlock();
+ return true;
+ }
+
+ /*
+ * Concurrency-related operations
+ */
+ bool take_reference() {
+ m_refcnt.fetch_add(1);
+ return true;
+ }
+
+ bool release_reference() {
+ m_refcnt.fetch_add(-1);
+
+ if (m_refcnt.load() == 0 && m_deferred_truncate.load()) {
+ assert(this->truncate());
+ }
+
+ return true;
+ }
+
+ bool active_merge() {
+ return m_merging.load();
+ }
+
+private:
+ int32_t try_advance_tail() {
+ size_t new_tail = m_reccnt.fetch_add(1);
+
+ if (new_tail < m_cap) return new_tail;
+ else return -1;
+ }
+
+ size_t m_cap;
+ size_t m_tombstone_cap;
+
+ Wrapped<R>* m_data;
+ Wrapped<R>* m_merge_data;
+
+ psudb::BloomFilter<R>* m_tombstone_filter;
+
+ alignas(64) std::atomic<size_t> m_tombstonecnt;
+ alignas(64) std::atomic<uint32_t> m_reccnt;
+ alignas(64) std::atomic<double> m_weight;
+ alignas(64) std::atomic<double> m_max_weight;
+ alignas(64) std::atomic<bool> m_merging;
+ alignas(64) std::atomic<bool> m_deferred_truncate;
+ alignas(64) std::atomic<size_t> m_refcnt;
+
+ alignas(64) std::mutex m_merge_lock;
+ alignas(64) std::mutex m_trunc_lock;
+ alignas(64) std::condition_variable m_trunc_signal;
+
+};
+
+}