summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-10-23 17:43:22 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-10-23 17:43:22 -0400
commit3afacb7702e6d8fa67749a2a41dc776d315e02a9 (patch)
tree8ea0e864d6098dd939e738a09033da7ed7877f4b /include
parentb72103cb11347f0dd108bd2321f29b0d6ab05106 (diff)
downloaddynamic-extension-3afacb7702e6d8fa67749a2a41dc776d315e02a9.tar.gz
Began moving to an explicit epoch-based system
I started moving over to an explicit Epoch based system, which has necessitated a ton of changes throughout the code base. This will ultimately allow for a much cleaner set of abstractions for managing concurrency.
Diffstat (limited to 'include')
-rw-r--r--include/framework/DynamicExtension.h279
-rw-r--r--include/framework/ShardRequirements.h2
-rw-r--r--include/framework/interface/Query.h3
-rw-r--r--include/framework/scheduling/Epoch.h128
-rw-r--r--include/framework/scheduling/FIFOScheduler.h (renamed from include/framework/scheduling/SerialScheduler.h)8
-rw-r--r--include/framework/scheduling/Task.h10
-rw-r--r--include/framework/structure/BufferView.h124
-rw-r--r--include/framework/structure/ExtensionStructure.h26
-rw-r--r--include/framework/structure/MutableBuffer.h4
9 files changed, 481 insertions, 103 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 26221d8..6936247 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -13,6 +13,7 @@
#include <numeric>
#include <cstdio>
#include <vector>
+#include <set>
#include "framework/structure/MutableBuffer.h"
#include "framework/structure/InternalLevel.h"
@@ -24,7 +25,8 @@
#include "framework/structure/ExtensionStructure.h"
#include "framework/util/Configuration.h"
-#include "framework/scheduling/SerialScheduler.h"
+#include "framework/scheduling/FIFOScheduler.h"
+#include "framework/scheduling/Epoch.h"
#include "psu-util/timer.h"
#include "psu-ds/Alias.h"
@@ -32,20 +34,30 @@
namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING,
- DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler>
+ DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=FIFOScheduler>
class DynamicExtension {
typedef S Shard;
typedef MutableBuffer<R> Buffer;
typedef ExtensionStructure<R, S, Q, L> Structure;
+ typedef Epoch<R, S, Q, L> Epoch;
+ typedef BufferView<R, Q> BufView;
+
public:
DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop, size_t memory_budget=0,
size_t thread_cnt=16)
: m_scale_factor(scale_factor)
, m_max_delete_prop(max_delete_prop)
, m_sched(memory_budget, thread_cnt)
+ , m_buffer_capacity(buffer_cap)
+ , m_buffer_delete_capacity(max_delete_prop*buffer_cap)
{
+ auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
+ auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop);
+ auto epoch = new Epoch(vers, buf);
+
m_buffers.push_back(new Buffer(buffer_cap, max_delete_prop*buffer_cap));
m_versions.push_back(new Structure(buffer_cap, scale_factor, max_delete_prop));
+ m_epochs.push_back({0, epoch});
}
~DynamicExtension() {
@@ -63,10 +75,10 @@ public:
}
int erase(const R &rec) {
- Buffer *buffer = get_buffer();
-
if constexpr (D == DeletePolicy::TAGGING) {
- if (get_active_version()->tagged_delete(rec)) {
+ BufView buffers = get_active_epoch()->get_buffer_view();
+
+ if (get_active_epoch()->get_structure()->tagged_delete(rec)) {
return 1;
}
@@ -75,7 +87,7 @@ public:
* probably has the lowest probability of having the record,
* so we'll check it last.
*/
- return buffer->delete_record(rec);
+ return buffers->delete_record(rec);
}
/*
@@ -85,43 +97,43 @@ public:
}
std::future<std::vector<R>> query(void *parms) {
- return schedule_query(get_active_version(), get_buffer(), parms);
+ return schedule_query(get_active_epoch()->get_structure(), get_active_epoch()->get_buffers()[0], parms);
}
size_t get_record_count() {
- size_t cnt = get_buffer()->get_record_count();
- return cnt + get_active_version()->get_record_count();
+ size_t cnt = get_active_epoch()->get_buffer_view().get_record_count();
+ return cnt + get_active_epoch()->get_structure()->get_record_count();
}
size_t get_tombstone_cnt() {
- size_t cnt = get_buffer()->get_tombstone_count();
- return cnt + get_active_version()->get_tombstone_cnt();
+ size_t cnt = get_active_epoch()->get_buffer_view().get_tombstone_count();
+ return cnt + get_active_epoch()->get_structure()->get_tombstone_cnt();
}
size_t get_height() {
- return get_active_version()->get_height();
+ return get_active_epoch()->get_structure()->get_height();
}
size_t get_memory_usage() {
- auto vers = get_active_version();
- auto buffer = get_buffer();
+ auto vers = get_active_epoch()->get_structure()->get_memory_usage();
+ auto buffer = get_active_epoch()->get_buffer_view().get_memory_usage();
- return vers.get_memory_usage() + buffer->get_memory_usage();
+ return vers + buffer;
}
size_t get_aux_memory_usage() {
- auto vers = get_active_version();
- auto buffer = get_buffer();
+ auto vers = get_active_epoch()->get_structure()->get_aux_memory_usage();
+ auto buffer = get_active_epoch()->get_buffer_view().get_aux_memory_usage();
- return vers.get_aux_memory_usage() + buffer->get_aux_memory_usage();
+ return vers + buffer;
}
size_t get_buffer_capacity() {
- return get_height()->get_capacity();
+ return m_buffer_capacity;
}
Shard *create_static_structure() {
- auto vers = get_active_version();
+ auto vers = get_active_epoch()->get_structure();
std::vector<Shard *> shards;
if (vers->get_levels().size() > 0) {
@@ -132,7 +144,9 @@ public:
}
}
- shards.emplace_back(new S(get_buffer()));
+ // FIXME: should use a buffer view--or perhaps some sort of a
+ // raw record iterator model.
+ shards.emplace_back(new S(get_active_epoch()->get_buffers()[0]));
Shard *shards_array[shards.size()];
@@ -158,33 +172,121 @@ public:
* tombstone proportion invariant.
*/
bool validate_tombstone_proportion() {
- return get_active_version()->validate_tombstone_proportion();
+ return get_active_epoch()->get_structure()->validate_tombstone_proportion();
}
private:
SCHED m_sched;
- std::vector<Buffer *> m_buffers;
- std::vector<Structure *> m_versions;
+ std::mutex m_struct_lock;
+ std::set<Buffer*> m_buffers;
+ std::set<Structure *> m_versions;
std::atomic<size_t> m_current_epoch;
+ std::unordered_map<size_t, Epoch *> m_epochs;
size_t m_scale_factor;
double m_max_delete_prop;
+ size_t m_buffer_capacity;
+ size_t m_buffer_delete_capacity;
- Buffer *get_buffer() {
- return m_buffers[0];
+ Epoch *get_active_epoch() {
+ return m_epochs[m_current_epoch.load()];
+ }
+
+ void advance_epoch() {
+ size_t new_epoch_num = m_current_epoch.load() + 1;
+ Epoch *new_epoch = m_epochs[new_epoch_num];
+ Epoch *old_epoch = m_epochs[m_current_epoch.load()];
+
+ // Update the new Epoch to contain the buffers
+ // from the old one that it doesn't currently have
+ size_t old_buffer_cnt = new_epoch->clear_buffers();
+ for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) {
+ new_epoch->add_buffer(old_epoch->get_buffers[i]);
+ }
+ m_current_epoch.fetch_add(1);
}
- Structure *get_active_version() {
- return m_versions[0];
+ /*
+ * Creates a new epoch by copying the currently active one. The new epoch's
+ * structure will be a shallow copy of the old one's.
+ */
+ Epoch *create_new_epoch() {
+ auto new_epoch = get_active_epoch()->clone();
+ std::unique_lock<std::mutex> m_struct_lock;
+ m_versions.insert(new_epoch->get_structure());
+ m_epochs.insert({m_current_epoch.load() + 1, new_epoch});
+ m_struct_lock.release();
+
+ return new_epoch;
+ }
+
+ /*
+ * Add a new empty buffer to the specified epoch. This is intended to be used
+ * when a merge is triggered, to allow for inserts to be sustained in the new
+ * buffer while a new epoch is being created in the background. Returns a
+ * pointer to the newly created buffer.
+ */
+ Buffer *add_empty_buffer(Epoch *epoch) {
+ auto new_buffer = Buffer(m_buffer_capacity, m_buffer_delete_capacity);
+
+ std::unique_lock<std::mutex> m_struct_lock;
+ m_buffers.insert(new_buffer);
+ m_struct_lock.release();
+
+ epoch->add_buffer(new_buffer);
+ return new_buffer;
+ }
+
+ void retire_epoch(Epoch *epoch) {
+ /*
+ * Epochs with currently active jobs cannot
+ * be retired. By the time retire_epoch is called,
+ * it is assumed that a new epoch is active, meaning
+ * that the epoch to be retired should no longer
+ * accumulate new active jobs. Eventually, this
+ * number will hit zero and the function will
+ * proceed.
+ *
+ * FIXME: this can be replaced with a cv, which
+ * is probably a superior solution in this case
+ */
+ while (epoch->get_active_job_num() > 0)
+ ;
+
+ /*
+ * The epoch's destructor will handle releasing
+ * all the references it holds
+ */
+ delete epoch;
+
+ /*
+ * Following the epoch's destruction, any buffers
+ * or structures with no remaining references can
+ * be safely freed.
+ */
+ std::unique_lock<std::mutex> lock(m_struct_lock);
+ for (auto buf : m_buffers) {
+ if (buf->get_reference_count() == 0) {
+ m_buffers.erase(buf);
+ delete buf;
+ }
+ }
+
+ for (auto vers : m_versions) {
+ if (vers->get_reference_count() == 0) {
+ m_versions.erase(vers);
+ delete vers;
+ }
+ }
}
static void merge(void *arguments) {
- MergeArgs *args = (MergeArgs *) arguments;
+ MergeArgs<R, S, Q, L> *args = (MergeArgs<R, S, Q, L> *) arguments;
- Structure *vers = (Structure *) args->version;
- Buffer *buff = (Buffer *) args->buffer;
+ Structure *vers = args->epoch->get_structure();
+ Buffer *buff = (Buffer *) args->epoch->get_buffers()[0];
for (ssize_t i=args->merges.size() - 1; i>=0; i--) {
vers->merge_levels(args->merges[i].second, args->merges[i].first);
@@ -193,98 +295,94 @@ private:
vers->merge_buffer(buff);
args->result.set_value(true);
+ args->epoch->end_job();
delete args;
}
+ static std::vector<R> finalize_query_result(std::vector<std::vector<Wrapped<R>>> &query_results, void *parms,
+ std::vector<void *> &shard_states, std::vector<void *> &buffer_states) {
+ auto result = Q::merge(query_results, parms);
+
+ for (size_t i=0; i<buffer_states.size(); i++) {
+ Q::delete_buffer_query_state(buffer_states[i]);
+ }
+
+ for (size_t i=0; i<states.size(); i++) {
+ Q::delete_query_state(shard_states[i]);
+ }
+
+ return result;
+ }
+
static void async_query(void *arguments) {
- QueryArgs<R> *args = (QueryArgs<R> *) arguments;
+ QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments;
- auto buffer = (Buffer *) args->buffer;
- auto vers = (Structure *) args->version;
+ auto buffers = args->epoch->get_buffer_view();
+ auto vers = args->epoch->get_structure();
void *parms = args->query_parms;
- // Get the buffer query state
- auto buffer_state = Q::get_buffer_query_state(buffer, parms);
+ // Get the buffer query states
+ std::vector<void *> buffer_states = buffers->get_buffer_query_states(parms);
// Get the shard query states
std::vector<std::pair<ShardID, Shard*>> shards;
- std::vector<void*> states;
-
- for (auto &level : vers->get_levels()) {
- level->get_query_states(shards, states, parms);
- }
+ std::vector<void *> shard_states = vers->get_query_states(shards, parms);
- Q::process_query_states(parms, states, buffer_state);
+ Q::process_query_states(parms, shard_states, buffer_states);
- std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
+ std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + buffer_states.size());
// Execute the query for the buffer
- auto buffer_results = Q::buffer_query(buffer, buffer_state, parms);
- query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers));
- if constexpr (Q::EARLY_ABORT) {
- if (query_results[0].size() > 0) {
- auto result = Q::merge(query_results, parms);
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(states[i]);
- }
+ std::vector<std::vector<Wrapped<R>>> buffer_results(buffer_states.size());
+ for (size_t i=0; i<buffer_states.size(); i++) {
+ auto buffer_results = Q::buffer_query(buffers->get_buffers[i], buffer_states[i], parms);
+ query_results[i] = std::move(filter_deletes(buffer_results, {-1, -1}, buffers, vers));
- Q::delete_buffer_query_state(buffer_state);
- return result;
+ if constexpr (Q::EARLY_ABORT) {
+ if (query_results[i] > 0) {
+ return finalize_query_result(query_results, parms, buffer_states, shard_states);
+ }
}
}
-
+
// Execute the query for each shard
for (size_t i=0; i<shards.size(); i++) {
auto shard_results = Q::query(shards[i].second, states[i], parms);
- query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer, vers));
+ query_results[i+buffer_states.size()] = std::move(filter_deletes(shard_results, shards[i].first, buffers, vers));
if constexpr (Q::EARLY_ABORT) {
- if (query_results[i].size() > 0) {
- auto result = Q::merge(query_results, parms);
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(states[i]);
- }
-
- Q::delete_buffer_query_state(buffer_state);
-
- return result;
+ if (query_results[i+buffer_states.size()].size() > 0) {
+ return finalize_query_result(query_results, parms, buffer_states, shard_states);
}
}
}
- // Merge the results together
- auto result = Q::merge(query_results, parms);
-
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(states[i]);
- }
-
- Q::delete_buffer_query_state(buffer_state);
- buffer->release_reference();
-
+ // Merge the results together and finalize the job
+ auto result = finalize_query_result(query_results, parms, buffer_states, shard_states);
args->result_set.set_value(std::move(result));
+
+ args->epoch->end_job();
delete args;
}
- std::future<bool> schedule_merge(Structure *version, Buffer *buffer) {
- MergeArgs *args = new MergeArgs();
- args->merges = version->get_merge_tasks(buffer->get_record_count());
- args->buffer = buffer;
- args->version = version;
+ std::future<bool> schedule_merge() {
+ auto epoch = get_active_epoch();
+ epoch->start_job();
+ MergeArgs<R, S, Q, L> *args = new MergeArgs<R, S, Q, L>();
+ args->epoch = epoch;
+ args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]);
m_sched.schedule_job(merge, 0, args);
return args->result.get_future();
}
- std::future<std::vector<R>> schedule_query(Structure *version, Buffer *buffer, void *query_parms) {
- buffer->take_reference(); // FIXME: this is wrong. The buffer and version need to be
- // taken atomically, together.
+ std::future<std::vector<R>> schedule_query(void *query_parms) {
+ auto epoch = get_active_epoch();
+ epoch->start_job();
- QueryArgs<R> *args = new QueryArgs<R>();
- args->buffer = buffer;
- args->version = version;
+ QueryArgs<R, S, Q, L> *args = new QueryArgs<R, S, Q, L>();
+ args->epoch = epoch;
args->query_parms = query_parms;
-
m_sched.schedule_job(async_query, 0, args);
return args->result_set.get_future();
@@ -292,20 +390,19 @@ private:
int internal_append(const R &rec, bool ts) {
Buffer *buffer;
- while (!(buffer = get_buffer()))
+ while (!(buffer = get_active_epoch()->get_active_buffer()))
;
if (buffer->is_full()) {
- auto vers = get_active_version();
+ auto vers = get_active_epoch()->get_structure();
auto res = schedule_merge(vers, buffer);
res.get();
}
-
return buffer->append(rec, ts);
}
- static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer, Structure *vers) {
+ static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, BufView *buffers, Structure *vers) {
if constexpr (!Q::SKIP_DELETE_FILTER) {
return records;
}
@@ -334,7 +431,7 @@ private:
continue;
}
- if (buffer->check_tombstone(rec.rec)) {
+ if (buffers->check_tombstone(rec.rec)) {
continue;
}
diff --git a/include/framework/ShardRequirements.h b/include/framework/ShardRequirements.h
index 95f7b67..d2d4ff2 100644
--- a/include/framework/ShardRequirements.h
+++ b/include/framework/ShardRequirements.h
@@ -3,7 +3,7 @@
*/
#pragma once
-#include "framework/structure/MutableBuffer.h"
+#include "framework/structure/BufferView.h"
#include "framework/interface/Record.h"
#include "framework/interface/Shard.h"
#include "framework/interface/Query.h"
diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h
index 46a1ce1..9b1d2d6 100644
--- a/include/framework/interface/Query.h
+++ b/include/framework/interface/Query.h
@@ -14,7 +14,6 @@
template <typename Q>
concept QueryInterface = requires(Q q, void *p, std::vector<void*> &s) {
-
/*
{q.get_query_state(p, p)} -> std::convertible_to<void*>;
{q.get_buffer_query_state(p, p)};
@@ -27,7 +26,7 @@ concept QueryInterface = requires(Q q, void *p, std::vector<void*> &s) {
{Q::SKIP_DELETE_FILTER} -> std::convertible_to<bool>;
//{Q::get_query_state(p, p)} -> std::convertible_to<void*>;
//{Q::get_buffer_query_state(p, p)} -> std::convertible_to<void*>;
- {Q::process_query_states(p, s, p)};
+ {Q::process_query_states(p, s, s)};
{Q::delete_query_state(std::declval<void*>())} -> std::same_as<void>;
{Q::delete_buffer_query_state(p)};
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
new file mode 100644
index 0000000..a1f865c
--- /dev/null
+++ b/include/framework/scheduling/Epoch.h
@@ -0,0 +1,128 @@
+/*
+ * include/framework/scheduling/Epoch.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include "framework/structure/MutableBuffer.h"
+#include "framework/structure/ExtensionStructure.h"
+#include "framework/structure/BufferView.h"
+
+namespace de {
+
+
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
+class Epoch {
+private:
+ typedef MutableBuffer<R> Buffer;
+ typedef ExtensionStructure<R, S, Q, L> Structure;
+ typedef BufferView<R, Q> BufView;
+public:
+ Epoch()
+ : m_buffers()
+ , m_structure(nullptr)
+ , m_active_jobs(0)
+ {}
+
+ Epoch(Structure *structure, Buffer *buff)
+ : m_buffers()
+ , m_structure(structure)
+ , m_active_jobs(0)
+ {
+ m_buffers.push_back(buff);
+ }
+
+ ~Epoch() {
+ assert(m_active_jobs.load() == 0);
+
+ for (auto buf : m_buffers) {
+ buf.release_reference();
+ }
+
+ if (m_structure) {
+ m_structure->release_reference();
+ }
+ }
+
+ void add_buffer(Buffer *buf) {
+ assert(buf);
+
+ buf->take_reference();
+ m_buffers.push_back(buf);
+ }
+
+ void start_job() {
+ m_active_jobs.fetch_add(1);
+ }
+
+ void end_job() {
+ m_active_jobs.fetch_add(-1);
+ }
+
+ size_t get_active_job_num() {
+ return m_active_jobs.load();
+ }
+
+ Structure *get_structure() {
+ return m_structure;
+ }
+
+ std::vector<Buffer *> &get_buffers() {
+ return m_buffers;
+ }
+
+ BufView get_buffer_view() {
+ return BufView(m_buffers);
+ }
+
+ Buffer *get_active_buffer() {
+ if (m_buffers.size() == 0) return nullptr;
+
+ return m_buffers[m_buffers.size() - 1];
+ }
+
+ /*
+ * Return the number of buffers in this epoch at
+ * time of call, and then clear the buffer vector,
+ * releasing all references in the process.
+ */
+ size_t clear_buffers() {
+ size_t buf_cnt = m_buffers.size();
+ for (auto buf : m_buffers) {
+ if (buf) buf->release_reference();
+ }
+
+ m_buffers.clear();
+ return buf_cnt;
+ }
+
+ /*
+ * Returns a new Epoch object that is a copy of this one. The new object will also contain
+ * a copy of the m_structure, rather than a reference to the same one.
+ */
+ Epoch *clone() {
+ auto epoch = new Epoch();
+ epoch->m_buffers = m_buffers;
+ if (m_structure) {
+ epoch->m_structure = m_structure->copy();
+ }
+ }
+
+private:
+ Structure *m_structure;
+ std::vector<Buffer *> m_buffers;
+
+ /*
+ * The number of currently active jobs
+ * (queries/merges) operating on this
+ * epoch. An epoch can only be retired
+ * when this number is 0.
+ */
+ std::atomic<size_t> m_active_jobs;
+};
+}
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index da2bb8e..878bb81 100644
--- a/include/framework/scheduling/SerialScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -29,19 +29,19 @@
namespace de {
-class SerialScheduler {
+class FIFOScheduler {
public:
- SerialScheduler(size_t memory_budget, size_t thread_cnt)
+ FIFOScheduler(size_t memory_budget, size_t thread_cnt)
: m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
, m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX)
, m_used_memory(0)
, m_used_thrds(0)
, m_shutdown(false)
{
- m_sched_thrd = std::thread(&SerialScheduler::run, this);
+ m_sched_thrd = std::thread(&FIFOScheduler::run, this);
}
- ~SerialScheduler() {
+ ~FIFOScheduler() {
shutdown();
m_cv.notify_all();
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 518159d..94c4d0a 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -8,20 +8,20 @@
#include <functional>
#include "framework/util/Configuration.h"
+#include "framework/scheduling/Epoch.h"
namespace de {
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
struct MergeArgs {
- void *version;
- void *buffer;
+ Epoch<R, S, Q, L> *epoch;
std::vector<MergeTask> merges;
std::promise<bool> result;
};
-template <typename R>
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
struct QueryArgs {
- void *version;
- void *buffer;
+ Epoch<R, S, Q, L> *epoch;
std::promise<std::vector<R>> result_set;
void *query_parms;
};
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
new file mode 100644
index 0000000..1efc1ac
--- /dev/null
+++ b/include/framework/structure/BufferView.h
@@ -0,0 +1,124 @@
+/*
+ * include/framework/structure/BufferView.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <cstdlib>
+#include <atomic>
+#include <condition_variable>
+#include <cassert>
+#include <numeric>
+#include <algorithm>
+#include <type_traits>
+
+#include "psu-util/alignment.h"
+#include "util/bf_config.h"
+#include "psu-ds/BloomFilter.h"
+#include "psu-ds/Alias.h"
+#include "psu-util/timer.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/interface/Query.h"
+
+namespace de {
+
+template <RecordInterface R, QueryInterface Q>
+class BufferView {
+ typedef MutableBuffer<R> Buffer;
+public:
+ BufferView() = default;
+
+ BufferView(std::vector<Buffer*> buffers)
+ : m_buffers(buffers)
+ , m_cutoff(buffers[buffers->size()-1]->get_record_count())
+ {}
+
+ ~BufferView() = default;
+
+ bool delete_record(const R& rec) {
+ auto res = false;
+ for (auto buf : m_buffers) {
+ res = buf->delete_record(rec);
+ if (res) return true;
+ }
+ return false;
+ }
+
+ bool check_tombstone(const R& rec) {
+ auto res = false;
+ for (auto buf : m_buffers) {
+ res = buf->check_tombstone(rec);
+ if (res) return true;
+ }
+ return false;
+ }
+
+ size_t get_record_count() {
+ size_t reccnt = 0;
+ for (auto buf : m_buffers) {
+ reccnt += buf->get_record_count();
+ }
+ return reccnt;
+ }
+
+ size_t get_capacity() {
+ return m_buffers[0]->get_capacity();
+ }
+
+ bool is_full() {
+ return m_buffers[m_buffers.size() - 1]->is_full();
+ }
+
+ size_t get_tombstone_count() {
+ size_t tscnt = 0;
+ for (auto buf : m_buffers) {
+ tscnt += buf->get_tombstone_count();
+ }
+ return tscnt;
+ }
+
+ size_t get_memory_usage() {
+ size_t mem = 0;
+ for (auto buf : m_buffers) {
+ mem += buf->get_memory_usage();
+ }
+ return mem;
+ }
+
+ size_t get_aux_memory_usage() {
+ size_t mem = 0;
+ for (auto buf : m_buffers) {
+ mem += buf->get_aux_memory_usage();
+ }
+ return mem;
+ }
+
+ size_t get_tombstone_capacity() {
+ return m_buffers[0]->get_tombstone_capacity();
+ }
+
+ std::vector<void *> get_buffer_states(void *parms) {
+ std::vector<void *> states;
+
+ for (auto buf : m_buffers) {
+ states.push_back(Q::get_buffer_query_state(buf, parms));
+ }
+
+ return states;
+ }
+
+ std::vector<Buffer *> &get_buffers() {
+ return m_buffers;
+ }
+
+private:
+ std::vector<Buffer *> m_buffers;
+ size_t m_cutoff;
+};
+
+}
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 8344518..de965ae 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -302,12 +302,38 @@ public:
m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
}
+ bool take_reference() {
+ m_refcnt.fetch_add(1);
+ return true;
+ }
+
+ bool release_reference() {
+ assert(m_refcnt.load() > 0);
+ m_refcnt.fetch_add(-1);
+ return true;
+ }
+
+ size_t get_reference_count() {
+ return m_refcnt.load();
+ }
+
+ std::vector<void *> get_query_states(std::vector<std::pair<ShardID, Shard*>> &shards, void *parms) {
+ std::vector<void*> states;
+
+ for (auto &level : m_levels) {
+ level->get_query_states(shards, states, parms);
+ }
+
+ return states;
+ }
private:
size_t m_scale_factor;
double m_max_delete_prop;
size_t m_buffer_size;
+ std::atomic<size_t> m_refcnt;
+
std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels;
/*
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 4e0b5c2..974dc28 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -217,6 +217,10 @@ public:
return true;
}
+ size_t get_reference_count() {
+ return m_refcnt.load();
+ }
+
bool active_merge() {
return m_merging.load();
}