From 7c03d771475421c1d5a2bbc135242536af1a371c Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 25 Sep 2023 10:49:36 -0400 Subject: Re-structuring Project + scheduling updates This is a big one--probably should have split it apart, but I'm feeling lazy this morning. * Organized the mess of header files in include/framework by splitting them out into their own subdirectories, and renaming a few files to remove redundancies introduced by the directory structure. * Introduced a new framework/ShardRequirements.h header file for simpler shard development. This header simply contains the necessary includes from framework/* for creating shard files. This should help to remove structural dependencies from the framework file structure and shards, as well as centralizing the necessary framework files to make shard development easier. * Created a (currently dummy) SchedulerInterface, and make the scheduler implementation a template parameter of the dynamic extension for easier testing of various scheduling policies. There's still more work to be done to fully integrate the scheduler (queries, multiple buffers), but some more of the necessary framework code for this has been added as well. * Adjusted the Task interface setup for the scheduler. The task structures have been removed from ExtensionStructure and placed in their own header file. Additionally, I started experimenting with using std::variant, as opposed to inheritence, to implement subtype polymorphism on the Merge and Query tasks. The scheduler now has a general task queue that contains both, and std::variant, std::visit, and std::get are used to manipulate them without virtual functions. * Removed Alex.h, as it can't build anyway. There's a branch out there containing the Alex implementation stripped of the C++20 stuff. So there's no need to keep it here. --- include/framework/structure/MutableBuffer.h | 242 ++++++++++++++++++++++++++++ 1 file changed, 242 insertions(+) create mode 100644 include/framework/structure/MutableBuffer.h (limited to 'include/framework/structure/MutableBuffer.h') diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h new file mode 100644 index 0000000..9f12175 --- /dev/null +++ b/include/framework/structure/MutableBuffer.h @@ -0,0 +1,242 @@ +/* + * include/framework/MutableBuffer.h + * + * Copyright (C) 2023 Douglas Rumbaugh + * Dong Xie + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "psu-util/alignment.h" +#include "util/bf_config.h" +#include "psu-ds/BloomFilter.h" +#include "psu-ds/Alias.h" +#include "psu-util/timer.h" +#include "framework/interface/Record.h" + +using psudb::CACHELINE_SIZE; + +namespace de { + +template +class MutableBuffer { +public: + MutableBuffer(size_t capacity, size_t max_tombstone_cap) + : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) + , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { + m_data = (Wrapped*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped)); + m_merge_data = (Wrapped*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped)); + m_tombstone_filter = nullptr; + if (max_tombstone_cap > 0) { + m_tombstone_filter = new psudb::BloomFilter(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS); + } + + m_refcnt.store(0); + m_deferred_truncate.store(false); + m_merging.store(false); + } + + ~MutableBuffer() { + assert(m_refcnt.load() == 0); + assert(m_merging.load() == false); + + if (m_data) free(m_data); + if (m_tombstone_filter) delete m_tombstone_filter; + if (m_merge_data) free(m_merge_data); + } + + template + int append(const R &rec, bool tombstone=false) { + if (tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; + + int32_t pos = 0; + if ((pos = try_advance_tail()) == -1) return 0; + + Wrapped wrec; + wrec.rec = rec; + wrec.header = 0; + if (tombstone) wrec.set_tombstone(); + + m_data[pos] = wrec; + m_data[pos].header |= (pos << 2); + + if (tombstone) { + m_tombstonecnt.fetch_add(1); + if (m_tombstone_filter) m_tombstone_filter->insert(rec); + } + + if constexpr (WeightedRecordInterface) { + m_weight.fetch_add(rec.weight); + double old = m_max_weight.load(); + while (old < rec.weight) { + m_max_weight.compare_exchange_strong(old, rec.weight); + old = m_max_weight.load(); + } + } else { + m_weight.fetch_add(1); + } + + return 1; + } + + bool truncate() { + m_tombstonecnt.store(0); + m_reccnt.store(0); + m_weight.store(0); + m_max_weight.store(0); + if (m_tombstone_filter) m_tombstone_filter->clear(); + + return true; + } + + size_t get_record_count() { + return m_reccnt; + } + + size_t get_capacity() { + return m_cap; + } + + bool is_full() { + return m_reccnt == m_cap; + } + + size_t get_tombstone_count() { + return m_tombstonecnt.load(); + } + + bool delete_record(const R& rec) { + auto offset = 0; + while (offset < m_reccnt.load()) { + if (m_data[offset].rec == rec) { + m_data[offset].set_delete(); + return true; + } + offset++; + } + + return false; + } + + bool check_tombstone(const R& rec) { + if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false; + + auto offset = 0; + while (offset < m_reccnt.load()) { + if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) { + return true; + } + offset++;; + } + return false; + } + + size_t get_memory_usage() { + return m_cap * sizeof(R); + } + + size_t get_aux_memory_usage() { + return m_tombstone_filter->get_memory_usage(); + } + + size_t get_tombstone_capacity() { + return m_tombstone_cap; + } + + double get_total_weight() { + return m_weight.load(); + } + + Wrapped *get_data() { + return m_data; + } + + double get_max_weight() { + return m_max_weight; + } + + bool start_merge() { + if (m_merge_lock.try_lock()) { + /* there cannot already been an active merge */ + if (m_merging.load()) { + m_merge_lock.unlock(); + return false; + } + + m_merging.store(true); + memcpy(m_merge_data, m_data, sizeof(Wrapped) * m_reccnt.load()); + return true; + } + + /* lock could not be obtained */ + return false; + } + + bool finish_merge() { + m_merge_lock.unlock(); + return true; + } + + /* + * Concurrency-related operations + */ + bool take_reference() { + m_refcnt.fetch_add(1); + return true; + } + + bool release_reference() { + m_refcnt.fetch_add(-1); + + if (m_refcnt.load() == 0 && m_deferred_truncate.load()) { + assert(this->truncate()); + } + + return true; + } + + bool active_merge() { + return m_merging.load(); + } + +private: + int32_t try_advance_tail() { + size_t new_tail = m_reccnt.fetch_add(1); + + if (new_tail < m_cap) return new_tail; + else return -1; + } + + size_t m_cap; + size_t m_tombstone_cap; + + Wrapped* m_data; + Wrapped* m_merge_data; + + psudb::BloomFilter* m_tombstone_filter; + + alignas(64) std::atomic m_tombstonecnt; + alignas(64) std::atomic m_reccnt; + alignas(64) std::atomic m_weight; + alignas(64) std::atomic m_max_weight; + alignas(64) std::atomic m_merging; + alignas(64) std::atomic m_deferred_truncate; + alignas(64) std::atomic m_refcnt; + + alignas(64) std::mutex m_merge_lock; + alignas(64) std::mutex m_trunc_lock; + alignas(64) std::condition_variable m_trunc_signal; + +}; + +} -- cgit v1.2.3