From 3afacb7702e6d8fa67749a2a41dc776d315e02a9 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 23 Oct 2023 17:43:22 -0400 Subject: Began moving to an explicit epoch-based system I started moving over to an explicit Epoch based system, which has necessitated a ton of changes throughout the code base. This will ultimately allow for a much cleaner set of abstractions for managing concurrency. --- include/framework/DynamicExtension.h | 279 +++++++++++++++-------- include/framework/ShardRequirements.h | 2 +- include/framework/interface/Query.h | 3 +- include/framework/scheduling/Epoch.h | 128 +++++++++++ include/framework/scheduling/FIFOScheduler.h | 96 ++++++++ include/framework/scheduling/SerialScheduler.h | 96 -------- include/framework/scheduling/Task.h | 10 +- include/framework/structure/BufferView.h | 124 ++++++++++ include/framework/structure/ExtensionStructure.h | 26 +++ include/framework/structure/MutableBuffer.h | 4 + 10 files changed, 573 insertions(+), 195 deletions(-) create mode 100644 include/framework/scheduling/Epoch.h create mode 100644 include/framework/scheduling/FIFOScheduler.h delete mode 100644 include/framework/scheduling/SerialScheduler.h create mode 100644 include/framework/structure/BufferView.h (limited to 'include/framework') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 26221d8..6936247 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -13,6 +13,7 @@ #include #include #include +#include #include "framework/structure/MutableBuffer.h" #include "framework/structure/InternalLevel.h" @@ -24,7 +25,8 @@ #include "framework/structure/ExtensionStructure.h" #include "framework/util/Configuration.h" -#include "framework/scheduling/SerialScheduler.h" +#include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/Epoch.h" #include "psu-util/timer.h" #include "psu-ds/Alias.h" @@ -32,20 +34,30 @@ namespace de { template + DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=FIFOScheduler> class DynamicExtension { typedef S Shard; typedef MutableBuffer Buffer; typedef ExtensionStructure Structure; + typedef Epoch Epoch; + typedef BufferView BufView; + public: DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop, size_t memory_budget=0, size_t thread_cnt=16) : m_scale_factor(scale_factor) , m_max_delete_prop(max_delete_prop) , m_sched(memory_budget, thread_cnt) + , m_buffer_capacity(buffer_cap) + , m_buffer_delete_capacity(max_delete_prop*buffer_cap) { + auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); + auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop); + auto epoch = new Epoch(vers, buf); + m_buffers.push_back(new Buffer(buffer_cap, max_delete_prop*buffer_cap)); m_versions.push_back(new Structure(buffer_cap, scale_factor, max_delete_prop)); + m_epochs.push_back({0, epoch}); } ~DynamicExtension() { @@ -63,10 +75,10 @@ public: } int erase(const R &rec) { - Buffer *buffer = get_buffer(); - if constexpr (D == DeletePolicy::TAGGING) { - if (get_active_version()->tagged_delete(rec)) { + BufView buffers = get_active_epoch()->get_buffer_view(); + + if (get_active_epoch()->get_structure()->tagged_delete(rec)) { return 1; } @@ -75,7 +87,7 @@ public: * probably has the lowest probability of having the record, * so we'll check it last. */ - return buffer->delete_record(rec); + return buffers->delete_record(rec); } /* @@ -85,43 +97,43 @@ public: } std::future> query(void *parms) { - return schedule_query(get_active_version(), get_buffer(), parms); + return schedule_query(get_active_epoch()->get_structure(), get_active_epoch()->get_buffers()[0], parms); } size_t get_record_count() { - size_t cnt = get_buffer()->get_record_count(); - return cnt + get_active_version()->get_record_count(); + size_t cnt = get_active_epoch()->get_buffer_view().get_record_count(); + return cnt + get_active_epoch()->get_structure()->get_record_count(); } size_t get_tombstone_cnt() { - size_t cnt = get_buffer()->get_tombstone_count(); - return cnt + get_active_version()->get_tombstone_cnt(); + size_t cnt = get_active_epoch()->get_buffer_view().get_tombstone_count(); + return cnt + get_active_epoch()->get_structure()->get_tombstone_cnt(); } size_t get_height() { - return get_active_version()->get_height(); + return get_active_epoch()->get_structure()->get_height(); } size_t get_memory_usage() { - auto vers = get_active_version(); - auto buffer = get_buffer(); + auto vers = get_active_epoch()->get_structure()->get_memory_usage(); + auto buffer = get_active_epoch()->get_buffer_view().get_memory_usage(); - return vers.get_memory_usage() + buffer->get_memory_usage(); + return vers + buffer; } size_t get_aux_memory_usage() { - auto vers = get_active_version(); - auto buffer = get_buffer(); + auto vers = get_active_epoch()->get_structure()->get_aux_memory_usage(); + auto buffer = get_active_epoch()->get_buffer_view().get_aux_memory_usage(); - return vers.get_aux_memory_usage() + buffer->get_aux_memory_usage(); + return vers + buffer; } size_t get_buffer_capacity() { - return get_height()->get_capacity(); + return m_buffer_capacity; } Shard *create_static_structure() { - auto vers = get_active_version(); + auto vers = get_active_epoch()->get_structure(); std::vector shards; if (vers->get_levels().size() > 0) { @@ -132,7 +144,9 @@ public: } } - shards.emplace_back(new S(get_buffer())); + // FIXME: should use a buffer view--or perhaps some sort of a + // raw record iterator model. + shards.emplace_back(new S(get_active_epoch()->get_buffers()[0])); Shard *shards_array[shards.size()]; @@ -158,33 +172,121 @@ public: * tombstone proportion invariant. */ bool validate_tombstone_proportion() { - return get_active_version()->validate_tombstone_proportion(); + return get_active_epoch()->get_structure()->validate_tombstone_proportion(); } private: SCHED m_sched; - std::vector m_buffers; - std::vector m_versions; + std::mutex m_struct_lock; + std::set m_buffers; + std::set m_versions; std::atomic m_current_epoch; + std::unordered_map m_epochs; size_t m_scale_factor; double m_max_delete_prop; + size_t m_buffer_capacity; + size_t m_buffer_delete_capacity; - Buffer *get_buffer() { - return m_buffers[0]; + Epoch *get_active_epoch() { + return m_epochs[m_current_epoch.load()]; + } + + void advance_epoch() { + size_t new_epoch_num = m_current_epoch.load() + 1; + Epoch *new_epoch = m_epochs[new_epoch_num]; + Epoch *old_epoch = m_epochs[m_current_epoch.load()]; + + // Update the new Epoch to contain the buffers + // from the old one that it doesn't currently have + size_t old_buffer_cnt = new_epoch->clear_buffers(); + for (size_t i=old_buffer_cnt; iget_buffers().size(); i++) { + new_epoch->add_buffer(old_epoch->get_buffers[i]); + } + m_current_epoch.fetch_add(1); } - Structure *get_active_version() { - return m_versions[0]; + /* + * Creates a new epoch by copying the currently active one. The new epoch's + * structure will be a shallow copy of the old one's. + */ + Epoch *create_new_epoch() { + auto new_epoch = get_active_epoch()->clone(); + std::unique_lock m_struct_lock; + m_versions.insert(new_epoch->get_structure()); + m_epochs.insert({m_current_epoch.load() + 1, new_epoch}); + m_struct_lock.release(); + + return new_epoch; + } + + /* + * Add a new empty buffer to the specified epoch. This is intended to be used + * when a merge is triggered, to allow for inserts to be sustained in the new + * buffer while a new epoch is being created in the background. Returns a + * pointer to the newly created buffer. + */ + Buffer *add_empty_buffer(Epoch *epoch) { + auto new_buffer = Buffer(m_buffer_capacity, m_buffer_delete_capacity); + + std::unique_lock m_struct_lock; + m_buffers.insert(new_buffer); + m_struct_lock.release(); + + epoch->add_buffer(new_buffer); + return new_buffer; + } + + void retire_epoch(Epoch *epoch) { + /* + * Epochs with currently active jobs cannot + * be retired. By the time retire_epoch is called, + * it is assumed that a new epoch is active, meaning + * that the epoch to be retired should no longer + * accumulate new active jobs. Eventually, this + * number will hit zero and the function will + * proceed. + * + * FIXME: this can be replaced with a cv, which + * is probably a superior solution in this case + */ + while (epoch->get_active_job_num() > 0) + ; + + /* + * The epoch's destructor will handle releasing + * all the references it holds + */ + delete epoch; + + /* + * Following the epoch's destruction, any buffers + * or structures with no remaining references can + * be safely freed. + */ + std::unique_lock lock(m_struct_lock); + for (auto buf : m_buffers) { + if (buf->get_reference_count() == 0) { + m_buffers.erase(buf); + delete buf; + } + } + + for (auto vers : m_versions) { + if (vers->get_reference_count() == 0) { + m_versions.erase(vers); + delete vers; + } + } } static void merge(void *arguments) { - MergeArgs *args = (MergeArgs *) arguments; + MergeArgs *args = (MergeArgs *) arguments; - Structure *vers = (Structure *) args->version; - Buffer *buff = (Buffer *) args->buffer; + Structure *vers = args->epoch->get_structure(); + Buffer *buff = (Buffer *) args->epoch->get_buffers()[0]; for (ssize_t i=args->merges.size() - 1; i>=0; i--) { vers->merge_levels(args->merges[i].second, args->merges[i].first); @@ -193,98 +295,94 @@ private: vers->merge_buffer(buff); args->result.set_value(true); + args->epoch->end_job(); delete args; } + static std::vector finalize_query_result(std::vector>> &query_results, void *parms, + std::vector &shard_states, std::vector &buffer_states) { + auto result = Q::merge(query_results, parms); + + for (size_t i=0; i *args = (QueryArgs *) arguments; + QueryArgs *args = (QueryArgs *) arguments; - auto buffer = (Buffer *) args->buffer; - auto vers = (Structure *) args->version; + auto buffers = args->epoch->get_buffer_view(); + auto vers = args->epoch->get_structure(); void *parms = args->query_parms; - // Get the buffer query state - auto buffer_state = Q::get_buffer_query_state(buffer, parms); + // Get the buffer query states + std::vector buffer_states = buffers->get_buffer_query_states(parms); // Get the shard query states std::vector> shards; - std::vector states; - - for (auto &level : vers->get_levels()) { - level->get_query_states(shards, states, parms); - } + std::vector shard_states = vers->get_query_states(shards, parms); - Q::process_query_states(parms, states, buffer_state); + Q::process_query_states(parms, shard_states, buffer_states); - std::vector>> query_results(shards.size() + 1); + std::vector>> query_results(shards.size() + buffer_states.size()); // Execute the query for the buffer - auto buffer_results = Q::buffer_query(buffer, buffer_state, parms); - query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers)); - if constexpr (Q::EARLY_ABORT) { - if (query_results[0].size() > 0) { - auto result = Q::merge(query_results, parms); - for (size_t i=0; i>> buffer_results(buffer_states.size()); + for (size_t i=0; iget_buffers[i], buffer_states[i], parms); + query_results[i] = std::move(filter_deletes(buffer_results, {-1, -1}, buffers, vers)); - Q::delete_buffer_query_state(buffer_state); - return result; + if constexpr (Q::EARLY_ABORT) { + if (query_results[i] > 0) { + return finalize_query_result(query_results, parms, buffer_states, shard_states); + } } } - + // Execute the query for each shard for (size_t i=0; i 0) { - auto result = Q::merge(query_results, parms); - for (size_t i=0; i 0) { + return finalize_query_result(query_results, parms, buffer_states, shard_states); } } } - // Merge the results together - auto result = Q::merge(query_results, parms); - - for (size_t i=0; irelease_reference(); - + // Merge the results together and finalize the job + auto result = finalize_query_result(query_results, parms, buffer_states, shard_states); args->result_set.set_value(std::move(result)); + + args->epoch->end_job(); delete args; } - std::future schedule_merge(Structure *version, Buffer *buffer) { - MergeArgs *args = new MergeArgs(); - args->merges = version->get_merge_tasks(buffer->get_record_count()); - args->buffer = buffer; - args->version = version; + std::future schedule_merge() { + auto epoch = get_active_epoch(); + epoch->start_job(); + MergeArgs *args = new MergeArgs(); + args->epoch = epoch; + args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]); m_sched.schedule_job(merge, 0, args); return args->result.get_future(); } - std::future> schedule_query(Structure *version, Buffer *buffer, void *query_parms) { - buffer->take_reference(); // FIXME: this is wrong. The buffer and version need to be - // taken atomically, together. + std::future> schedule_query(void *query_parms) { + auto epoch = get_active_epoch(); + epoch->start_job(); - QueryArgs *args = new QueryArgs(); - args->buffer = buffer; - args->version = version; + QueryArgs *args = new QueryArgs(); + args->epoch = epoch; args->query_parms = query_parms; - m_sched.schedule_job(async_query, 0, args); return args->result_set.get_future(); @@ -292,20 +390,19 @@ private: int internal_append(const R &rec, bool ts) { Buffer *buffer; - while (!(buffer = get_buffer())) + while (!(buffer = get_active_epoch()->get_active_buffer())) ; if (buffer->is_full()) { - auto vers = get_active_version(); + auto vers = get_active_epoch()->get_structure(); auto res = schedule_merge(vers, buffer); res.get(); } - return buffer->append(rec, ts); } - static std::vector> filter_deletes(std::vector> &records, ShardID shid, Buffer *buffer, Structure *vers) { + static std::vector> filter_deletes(std::vector> &records, ShardID shid, BufView *buffers, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -334,7 +431,7 @@ private: continue; } - if (buffer->check_tombstone(rec.rec)) { + if (buffers->check_tombstone(rec.rec)) { continue; } diff --git a/include/framework/ShardRequirements.h b/include/framework/ShardRequirements.h index 95f7b67..d2d4ff2 100644 --- a/include/framework/ShardRequirements.h +++ b/include/framework/ShardRequirements.h @@ -3,7 +3,7 @@ */ #pragma once -#include "framework/structure/MutableBuffer.h" +#include "framework/structure/BufferView.h" #include "framework/interface/Record.h" #include "framework/interface/Shard.h" #include "framework/interface/Query.h" diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h index 46a1ce1..9b1d2d6 100644 --- a/include/framework/interface/Query.h +++ b/include/framework/interface/Query.h @@ -14,7 +14,6 @@ template concept QueryInterface = requires(Q q, void *p, std::vector &s) { - /* {q.get_query_state(p, p)} -> std::convertible_to; {q.get_buffer_query_state(p, p)}; @@ -27,7 +26,7 @@ concept QueryInterface = requires(Q q, void *p, std::vector &s) { {Q::SKIP_DELETE_FILTER} -> std::convertible_to; //{Q::get_query_state(p, p)} -> std::convertible_to; //{Q::get_buffer_query_state(p, p)} -> std::convertible_to; - {Q::process_query_states(p, s, p)}; + {Q::process_query_states(p, s, s)}; {Q::delete_query_state(std::declval())} -> std::same_as; {Q::delete_buffer_query_state(p)}; diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h new file mode 100644 index 0000000..a1f865c --- /dev/null +++ b/include/framework/scheduling/Epoch.h @@ -0,0 +1,128 @@ +/* + * include/framework/scheduling/Epoch.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * Dong Xie + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include "framework/structure/MutableBuffer.h" +#include "framework/structure/ExtensionStructure.h" +#include "framework/structure/BufferView.h" + +namespace de { + + +template +class Epoch { +private: + typedef MutableBuffer Buffer; + typedef ExtensionStructure Structure; + typedef BufferView BufView; +public: + Epoch() + : m_buffers() + , m_structure(nullptr) + , m_active_jobs(0) + {} + + Epoch(Structure *structure, Buffer *buff) + : m_buffers() + , m_structure(structure) + , m_active_jobs(0) + { + m_buffers.push_back(buff); + } + + ~Epoch() { + assert(m_active_jobs.load() == 0); + + for (auto buf : m_buffers) { + buf.release_reference(); + } + + if (m_structure) { + m_structure->release_reference(); + } + } + + void add_buffer(Buffer *buf) { + assert(buf); + + buf->take_reference(); + m_buffers.push_back(buf); + } + + void start_job() { + m_active_jobs.fetch_add(1); + } + + void end_job() { + m_active_jobs.fetch_add(-1); + } + + size_t get_active_job_num() { + return m_active_jobs.load(); + } + + Structure *get_structure() { + return m_structure; + } + + std::vector &get_buffers() { + return m_buffers; + } + + BufView get_buffer_view() { + return BufView(m_buffers); + } + + Buffer *get_active_buffer() { + if (m_buffers.size() == 0) return nullptr; + + return m_buffers[m_buffers.size() - 1]; + } + + /* + * Return the number of buffers in this epoch at + * time of call, and then clear the buffer vector, + * releasing all references in the process. + */ + size_t clear_buffers() { + size_t buf_cnt = m_buffers.size(); + for (auto buf : m_buffers) { + if (buf) buf->release_reference(); + } + + m_buffers.clear(); + return buf_cnt; + } + + /* + * Returns a new Epoch object that is a copy of this one. The new object will also contain + * a copy of the m_structure, rather than a reference to the same one. + */ + Epoch *clone() { + auto epoch = new Epoch(); + epoch->m_buffers = m_buffers; + if (m_structure) { + epoch->m_structure = m_structure->copy(); + } + } + +private: + Structure *m_structure; + std::vector m_buffers; + + /* + * The number of currently active jobs + * (queries/merges) operating on this + * epoch. An epoch can only be retired + * when this number is 0. + */ + std::atomic m_active_jobs; +}; +} diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h new file mode 100644 index 0000000..878bb81 --- /dev/null +++ b/include/framework/scheduling/FIFOScheduler.h @@ -0,0 +1,96 @@ +/* + * include/framework/Scheduler.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * Dong Xie + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "util/types.h" +#include "framework/interface/Shard.h" +#include "framework/interface/Query.h" +#include "framework/interface/Record.h" +#include "framework/structure/MutableBuffer.h" +#include "framework/util/Configuration.h" +#include "framework/structure/ExtensionStructure.h" +#include "framework/scheduling/Task.h" + +#include "psu-ds/LockedPriorityQueue.h" + +namespace de { + +class FIFOScheduler { +public: + FIFOScheduler(size_t memory_budget, size_t thread_cnt) + : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) + , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) + , m_used_memory(0) + , m_used_thrds(0) + , m_shutdown(false) + { + m_sched_thrd = std::thread(&FIFOScheduler::run, this); + } + + ~FIFOScheduler() { + shutdown(); + + m_cv.notify_all(); + m_sched_thrd.join(); + } + + void schedule_job(std::function job, size_t size, void *args) { + size_t ts = m_counter.fetch_add(1); + m_task_queue.push(Task(size, ts, job, args)); + m_cv.notify_all(); + } + + void shutdown() { + m_shutdown = true; + } + +private: + psudb::LockedPriorityQueue m_task_queue; + + size_t m_memory_budget; + size_t m_thrd_cnt; + + bool m_shutdown; + + std::atomic m_counter; + std::mutex m_cv_lock; + std::condition_variable m_cv; + + std::thread m_sched_thrd; + + std::atomic m_used_thrds; + std::atomic m_used_memory; + + void schedule_next() { + auto t = m_task_queue.pop(); + t(); + } + + void run() { + do { + std::unique_lock cv_lock(m_cv_lock); + m_cv.wait(cv_lock); + + while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) { + schedule_next(); + } + cv_lock.unlock(); + } while(!m_shutdown); + } +}; + +} diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h deleted file mode 100644 index da2bb8e..0000000 --- a/include/framework/scheduling/SerialScheduler.h +++ /dev/null @@ -1,96 +0,0 @@ -/* - * include/framework/Scheduler.h - * - * Copyright (C) 2023 Douglas B. Rumbaugh - * Dong Xie - * - * All rights reserved. Published under the Modified BSD License. - * - */ -#pragma once - -#include -#include -#include -#include -#include -#include - -#include "util/types.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" -#include "framework/interface/Record.h" -#include "framework/structure/MutableBuffer.h" -#include "framework/util/Configuration.h" -#include "framework/structure/ExtensionStructure.h" -#include "framework/scheduling/Task.h" - -#include "psu-ds/LockedPriorityQueue.h" - -namespace de { - -class SerialScheduler { -public: - SerialScheduler(size_t memory_budget, size_t thread_cnt) - : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) - , m_used_memory(0) - , m_used_thrds(0) - , m_shutdown(false) - { - m_sched_thrd = std::thread(&SerialScheduler::run, this); - } - - ~SerialScheduler() { - shutdown(); - - m_cv.notify_all(); - m_sched_thrd.join(); - } - - void schedule_job(std::function job, size_t size, void *args) { - size_t ts = m_counter.fetch_add(1); - m_task_queue.push(Task(size, ts, job, args)); - m_cv.notify_all(); - } - - void shutdown() { - m_shutdown = true; - } - -private: - psudb::LockedPriorityQueue m_task_queue; - - size_t m_memory_budget; - size_t m_thrd_cnt; - - bool m_shutdown; - - std::atomic m_counter; - std::mutex m_cv_lock; - std::condition_variable m_cv; - - std::thread m_sched_thrd; - - std::atomic m_used_thrds; - std::atomic m_used_memory; - - void schedule_next() { - auto t = m_task_queue.pop(); - t(); - } - - void run() { - do { - std::unique_lock cv_lock(m_cv_lock); - m_cv.wait(cv_lock); - - while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) { - schedule_next(); - } - cv_lock.unlock(); - } while(!m_shutdown); - } -}; - -} diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 518159d..94c4d0a 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -8,20 +8,20 @@ #include #include "framework/util/Configuration.h" +#include "framework/scheduling/Epoch.h" namespace de { +template struct MergeArgs { - void *version; - void *buffer; + Epoch *epoch; std::vector merges; std::promise result; }; -template +template struct QueryArgs { - void *version; - void *buffer; + Epoch *epoch; std::promise> result_set; void *query_parms; }; diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h new file mode 100644 index 0000000..1efc1ac --- /dev/null +++ b/include/framework/structure/BufferView.h @@ -0,0 +1,124 @@ +/* + * include/framework/structure/BufferView.h + * + * Copyright (C) 2023 Douglas Rumbaugh + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "psu-util/alignment.h" +#include "util/bf_config.h" +#include "psu-ds/BloomFilter.h" +#include "psu-ds/Alias.h" +#include "psu-util/timer.h" +#include "framework/interface/Record.h" +#include "framework/structure/MutableBuffer.h" +#include "framework/interface/Query.h" + +namespace de { + +template +class BufferView { + typedef MutableBuffer Buffer; +public: + BufferView() = default; + + BufferView(std::vector buffers) + : m_buffers(buffers) + , m_cutoff(buffers[buffers->size()-1]->get_record_count()) + {} + + ~BufferView() = default; + + bool delete_record(const R& rec) { + auto res = false; + for (auto buf : m_buffers) { + res = buf->delete_record(rec); + if (res) return true; + } + return false; + } + + bool check_tombstone(const R& rec) { + auto res = false; + for (auto buf : m_buffers) { + res = buf->check_tombstone(rec); + if (res) return true; + } + return false; + } + + size_t get_record_count() { + size_t reccnt = 0; + for (auto buf : m_buffers) { + reccnt += buf->get_record_count(); + } + return reccnt; + } + + size_t get_capacity() { + return m_buffers[0]->get_capacity(); + } + + bool is_full() { + return m_buffers[m_buffers.size() - 1]->is_full(); + } + + size_t get_tombstone_count() { + size_t tscnt = 0; + for (auto buf : m_buffers) { + tscnt += buf->get_tombstone_count(); + } + return tscnt; + } + + size_t get_memory_usage() { + size_t mem = 0; + for (auto buf : m_buffers) { + mem += buf->get_memory_usage(); + } + return mem; + } + + size_t get_aux_memory_usage() { + size_t mem = 0; + for (auto buf : m_buffers) { + mem += buf->get_aux_memory_usage(); + } + return mem; + } + + size_t get_tombstone_capacity() { + return m_buffers[0]->get_tombstone_capacity(); + } + + std::vector get_buffer_states(void *parms) { + std::vector states; + + for (auto buf : m_buffers) { + states.push_back(Q::get_buffer_query_state(buf, parms)); + } + + return states; + } + + std::vector &get_buffers() { + return m_buffers; + } + +private: + std::vector m_buffers; + size_t m_cutoff; +}; + +} diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 8344518..de965ae 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -302,12 +302,38 @@ public: m_levels[incoming_level] = std::shared_ptr>(new InternalLevel(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); } + bool take_reference() { + m_refcnt.fetch_add(1); + return true; + } + + bool release_reference() { + assert(m_refcnt.load() > 0); + m_refcnt.fetch_add(-1); + return true; + } + + size_t get_reference_count() { + return m_refcnt.load(); + } + + std::vector get_query_states(std::vector> &shards, void *parms) { + std::vector states; + + for (auto &level : m_levels) { + level->get_query_states(shards, states, parms); + } + + return states; + } private: size_t m_scale_factor; double m_max_delete_prop; size_t m_buffer_size; + std::atomic m_refcnt; + std::vector>> m_levels; /* diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 4e0b5c2..974dc28 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -217,6 +217,10 @@ public: return true; } + size_t get_reference_count() { + return m_refcnt.load(); + } + bool active_merge() { return m_merging.load(); } -- cgit v1.2.3