summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-12 14:10:11 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-12 14:10:11 -0500
commitaac0bb661af8fae38d3ce08d6078cb4d9dfcb575 (patch)
tree347e0ce7f7e15f2610039f02b75d47cedf810cd6 /include/framework/DynamicExtension.h
parentc4514c2e62a711189cf3c914297885d97fb51a09 (diff)
downloaddynamic-extension-aac0bb661af8fae38d3ce08d6078cb4d9dfcb575.tar.gz
Initial integration of new buffering scheme into framework
It isn't working right now (lotsa test failures), but we're to the debugging phase now.
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h252
1 files changed, 88 insertions, 164 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index c5c4a1a..c97b390 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -10,29 +10,23 @@
#pragma once
#include <atomic>
-#include <numeric>
#include <cstdio>
#include <vector>
#include <set>
#include <shared_mutex>
#include <mutex>
+#include "framework/interface/Scheduler.h"
+#include "framework/scheduling/FIFOScheduler.h"
+#include "framework/scheduling/SerialScheduler.h"
+
#include "framework/structure/MutableBuffer.h"
-#include "framework/structure/InternalLevel.h"
-#include "framework/interface/Shard.h"
-#include "framework/interface/Query.h"
#include "framework/interface/Record.h"
-#include "framework/interface/Query.h"
-#include "framework/interface/Scheduler.h"
#include "framework/structure/ExtensionStructure.h"
#include "framework/util/Configuration.h"
-#include "framework/scheduling/FIFOScheduler.h"
-#include "framework/scheduling/SerialScheduler.h"
#include "framework/scheduling/Epoch.h"
-#include "psu-util/timer.h"
-#include "psu-ds/Alias.h"
namespace de {
@@ -43,22 +37,19 @@ class DynamicExtension {
typedef MutableBuffer<R> Buffer;
typedef ExtensionStructure<R, S, Q, L> Structure;
typedef Epoch<R, S, Q, L> _Epoch;
- typedef BufferView<R, Q> BufView;
+ typedef BufferView<R> BufView;
public:
- DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop, size_t memory_budget=0,
+ DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0,
size_t thread_cnt=16)
: m_scale_factor(scale_factor)
- , m_max_delete_prop(max_delete_prop)
+ , m_max_delete_prop(1)
, m_sched(memory_budget, thread_cnt)
- , m_buffer_capacity(buffer_cap)
- , m_buffer_delete_capacity(max_delete_prop*buffer_cap)
+ , m_buffer(new Buffer(buffer_lwm, buffer_hwm))
{
- auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
- auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop);
- auto epoch = new _Epoch(0, vers, buf);
+ auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop);
+ auto epoch = new _Epoch(0, vers, m_buffer);
- m_buffers.insert(buf);
m_versions.insert(vers);
m_epochs.insert({0, epoch});
}
@@ -79,9 +70,7 @@ public:
delete e.second;
}
- for (auto e : m_buffers) {
- delete e;
- }
+ delete m_buffer;
for (auto e : m_versions) {
delete e;
@@ -95,10 +84,15 @@ public:
int erase(const R &rec) {
// FIXME: delete tagging will require a lot of extra work to get
// operating "correctly" in a concurrent environment.
+
+ /*
+ * Get a view on the buffer *first*. This will ensure a stronger
+ * ordering than simply accessing the buffer directly, but is
+ * not *strictly* necessary.
+ */
+ auto view = m_buffer->get_buffer_view();
if constexpr (D == DeletePolicy::TAGGING) {
static_assert(std::same_as<SCHED, SerialScheduler>, "Tagging is only supported in single-threaded operation");
- BufView buffers = get_active_epoch()->get_buffer_view();
-
if (get_active_epoch()->get_structure()->tagged_delete(rec)) {
return 1;
}
@@ -108,7 +102,7 @@ public:
* probably has the lowest probability of having the record,
* so we'll check it last.
*/
- return buffers.delete_record(rec);
+ return view.delete_record(rec);
}
/*
@@ -123,7 +117,7 @@ public:
size_t get_record_count() {
auto epoch = get_active_epoch_protected();
- auto t = epoch->get_buffer_view().get_record_count() + epoch->get_structure()->get_record_count();
+ auto t = epoch->get_buffer().get_record_count() + epoch->get_structure()->get_record_count();
epoch->end_job();
return t;
@@ -131,7 +125,7 @@ public:
size_t get_tombstone_count() {
auto epoch = get_active_epoch_protected();
- auto t = epoch->get_buffer_view().get_tombstone_count() + epoch->get_structure()->get_tombstone_count();
+ auto t = epoch->get_buffer().get_tombstone_count() + epoch->get_structure()->get_tombstone_count();
epoch->end_job();
return t;
@@ -147,7 +141,7 @@ public:
size_t get_memory_usage() {
auto epoch = get_active_epoch_protected();
- auto t= epoch->get_buffer_view().get_memory_usage() + epoch->get_structure()->get_memory_usage();
+ auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage();
epoch->end_job();
return t;
@@ -155,14 +149,14 @@ public:
size_t get_aux_memory_usage() {
auto epoch = get_active_epoch_protected();
- auto t = epoch->get_buffer_view().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage();
+ auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage();
epoch->end_job();
return t;
}
size_t get_buffer_capacity() {
- return m_buffer_capacity;
+ return m_buffer->get_capacity();
}
Shard *create_static_structure(bool await_reconstruction_completion=false) {
@@ -171,11 +165,20 @@ public:
}
auto epoch = get_active_epoch_protected();
- auto bv = epoch->get_buffer_view();
-
auto vers = epoch->get_structure();
std::vector<Shard *> shards;
+ /*
+ * construct a shard from the buffer view. We'll hold the view
+ * for as short a time as possible: once the records are exfiltrated
+ * from the buffer, there's no reason to retain a hold on the view's
+ * head pointer any longer
+ */
+ {
+ auto bv = epoch->get_buffer();
+ shards.emplace_back(new S(std::move(bv)));
+ }
+
if (vers->get_levels().size() > 0) {
for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
if (vers->get_levels()[i]) {
@@ -184,12 +187,6 @@ public:
}
}
- // FIXME: With an interface adjustment, this could be done in
- // one call, rather than a loop.
- for (ssize_t i=bv.size() - 1; i>=0; i--) {
- shards.emplace_back(new S(bv.get_buffers()[i]));
- }
-
Shard *shards_array[shards.size()];
size_t j = 0;
@@ -237,10 +234,13 @@ public:
private:
SCHED m_sched;
+ Buffer *m_buffer;
+
std::mutex m_struct_lock;
- std::set<Buffer*> m_buffers;
std::set<Structure *> m_versions;
+ alignas(64) std::atomic<bool> m_reconstruction_scheduled;
+
std::atomic<size_t> m_current_epoch;
std::atomic<size_t> m_newest_epoch;
std::unordered_map<size_t, _Epoch *> m_epochs;
@@ -253,8 +253,6 @@ private:
size_t m_scale_factor;
double m_max_delete_prop;
- size_t m_buffer_capacity;
- size_t m_buffer_delete_capacity;
void enforce_delete_invariant(_Epoch *epoch) {
auto structure = epoch->get_structure();
@@ -321,6 +319,7 @@ private:
*/
enforce_delete_invariant(new_epoch);
+ #if 0
/*
* Update the new Epoch to contain the buffers from the old one
* that it doesn't currently have if using a multi-threaded
@@ -339,6 +338,7 @@ private:
new_epoch->add_buffer(old_epoch->get_buffers()[i]);
}
}
+ #endif
m_current_epoch.fetch_add(1);
old_epoch->set_inactive();
@@ -373,57 +373,6 @@ private:
return new_epoch;
}
- /*
- * Add a new empty buffer. This is intended to be used
- * when a reconstruction is triggered, to allow for inserts to be sustained in the new
- * buffer while a new epoch is being created in the background. Returns a
- * pointer to the newly created buffer.
- */
- Buffer *add_empty_buffer() {
- /*
- * if there's a current Epoch transition ongoing, a buffer installed
- * into an older Epoch, but not the new one, may be lost. So fail to
- * insert a buffer.
- */
- if (!m_epoch_transition_lk.try_lock()) {
- return nullptr;
- }
-
- /*
- * verify that the currently active buffer is still full, if
- * not, there is no reason to add a new one. This code is
- * protected by the epoch transition lock, so need need to
- * take a protected reference to the epoch.
- */
- auto active_epoch = get_active_epoch();
- if (!active_epoch->get_active_buffer()->is_full()) {
- m_epoch_transition_lk.unlock();
- return nullptr;
- }
-
- /*
- * create a new buffer and install it in the active epoch.
- */
- auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
-
- std::unique_lock<std::mutex> m_struct_lock;
- auto new_buffer = active_epoch->add_buffer(temp_buffer);
-
- /*
- * if epoch->add_buffer doesn't add the new buffer, this insert
- * won't update the buffer set (duplicate insert)
- */
- m_buffers.insert(new_buffer);
- m_struct_lock.release();
-
- if (new_buffer != temp_buffer) {
- delete temp_buffer;
- }
- m_epoch_transition_lk.unlock();
-
- return new_buffer;
- }
-
void retire_epoch(_Epoch *epoch) {
/*
* Epochs with currently active jobs cannot
@@ -452,21 +401,14 @@ private:
delete epoch;
m_epoch_retire_lk.unlock();
+ /* NOTE: the BufferView mechanism handles freeing unused buffer space */
+
/*
* Following the epoch's destruction, any buffers
* or structures with no remaining references can
* be safely freed.
*/
std::unique_lock<std::mutex> lock(m_struct_lock);
- for (auto itr = m_buffers.begin(); itr != m_buffers.end();) {
- if ((*itr)->get_reference_count() == 0) {
- auto tmp = *itr;
- itr = m_buffers.erase(itr);
- delete tmp;
- } else {
- itr++;
- }
- }
for (auto itr = m_versions.begin(); itr != m_versions.end();) {
if ((*itr)->get_reference_count() == 0) {
@@ -484,21 +426,31 @@ private:
Structure *vers = args->epoch->get_structure();
- // FIXME: with an improved shard interface, multiple full buffers
+ // FIXME: with an improved shard interface, multiple full buffer_viewers
// could be flushed at once here.
- Buffer *buff = (Buffer *) args->epoch->get_buffers()[0];
+ auto buffer_view = args->epoch->get_buffer();
+ size_t new_head = buffer_view.get_tail();
for (ssize_t i=0; i<args->merges.size(); i++) {
vers->reconstruction(args->merges[i].second, args->merges[i].first);
}
/*
- * if performing a compaction, don't push the buffer down,
- * as there is no guarantee that any necessary reconstructions
+ * if performing a compaction, don't flush the buffer, as
+ * there is no guarantee that any necessary reconstructions
* will free sufficient space in L0 to support a flush
*/
if (!args->compaction) {
- vers->flush_buffer(buff);
+ vers->flush_buffer(std::move(buffer_view));
+
+ // FIXME: this may currently fail because there isn't any
+ // query preemption yet. At this point, we'd need to either
+ // 1) wait for all queries on the old_head to finish
+ // 2) kill all queries on the old_head
+ // 3) somehow migrate all queries on the old_head to the new
+ // version
+ auto res = args->epoch->advance_buffer_head(new_head);
+ assert(res);
}
args->epoch->end_job();
@@ -519,27 +471,33 @@ private:
static void async_query(void *arguments) {
QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments;
- auto buffers = args->epoch->get_buffer_view();
+ auto buffer = args->epoch->get_buffer();
auto vers = args->epoch->get_structure();
void *parms = args->query_parms;
/* Get the buffer query states */
- std::vector<void *> buffer_states = buffers.get_query_states(parms);
+ void *buffer_state = Q::get_buffer_query_state(std::move(buffer), parms);
/* Get the shard query states */
std::vector<std::pair<ShardID, Shard*>> shards;
std::vector<void *> states = vers->get_query_states(shards, parms);
- Q::process_query_states(parms, states, buffer_states);
+ Q::process_query_states(parms, states, buffer_state);
- std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + buffer_states.size());
+ std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
for (size_t i=0; i<query_results.size(); i++) {
- std::vector<Wrapped<R>> local_results = (i < buffer_states.size())
- ? Q::buffer_query(buffers.get_buffers()[i], buffer_states[i], parms)
- : Q::query(shards[i - buffer_states.size()].second,
- states[i - buffer_states.size()], parms);
- ShardID shid = (i < buffer_states.size()) ? INVALID_SHID : shards[i - buffer_states.size()].first;
- query_results[i] = std::move(filter_deletes(local_results, shid, buffers, vers));
+ std::vector<Wrapped<R>> local_results;
+ ShardID shid;
+
+ if (i == 0) { /* process the buffer first */
+ local_results = Q::buffer_query(buffer_state, parms);
+ shid = INVALID_SHID;
+ } else {
+ local_results = Q::query(shards[i - 1].second, states[i - 1], parms);
+ shid = shards[i - 1].first;
+ }
+
+ query_results[i] = std::move(filter_deletes(local_results, shid, vers));
if constexpr (Q::EARLY_ABORT) {
if (query_results[i].size() > 0) break;
@@ -551,10 +509,7 @@ private:
args->epoch->end_job();
- for (size_t i=0; i<buffer_states.size(); i++) {
- Q::delete_buffer_query_state(buffer_states[i]);
- }
-
+ Q::delete_buffer_query_state(buffer_state);
for (size_t i=0; i<states.size(); i++) {
Q::delete_query_state(states[i]);
}
@@ -575,7 +530,7 @@ private:
//
ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>();
args->epoch = epoch;
- args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffers()[0]->get_record_count());
+ args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffer().get_record_count());
args->extension = this;
args->compaction = false;
/* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */
@@ -597,49 +552,16 @@ private:
}
int internal_append(const R &rec, bool ts) {
- Buffer *buffer = nullptr;
- int res = 0;
- do {
- auto epoch = get_active_epoch_protected();
- buffer = epoch->get_active_buffer();
- assert(buffer);
-
- /*
- * If the buffer is full and there is no ongoing reconstruction,
- * schedule a reconstruction and add a new empty buffer. If there
- * is an ongoing reconstruction, then add a new empty buffer
- * to the current epoch.
- */
- if (buffer->is_full()) {
- if constexpr (std::same_as<SCHED, SerialScheduler>) {
- /* single threaded: run reconstruction and then empty buffer */
- epoch->end_job();
- schedule_reconstruction();
- buffer->truncate();
- continue;
- } else if (epoch->prepare_reconstruction()) {
- /*
- * add an empty buffer to allow insert proceed and
- * schedule a reconstruction on a background thread
- */
- buffer = add_empty_buffer();
- schedule_reconstruction();
- } else {
- /* background reconstruction is ongoing, so just add empty buffer */
- buffer = add_empty_buffer();
- }
- }
-
- res = (buffer) ? buffer->append(rec, ts) : 0;
- epoch->end_job();
- } while(!res);
+ if (!m_reconstruction_scheduled.load() && m_buffer->is_at_low_watermark()) {
+ m_reconstruction_scheduled.store(true);
+ schedule_reconstruction();
+ }
- /* internal append should always succeed, eventually */
- return 1;
+ /* this will fail if the HWM is reached and return 0 */
+ return m_buffer->append(rec, ts);
}
- static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid,
- BufView &buffers, Structure *vers) {
+ static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers) {
if constexpr (!Q::SKIP_DELETE_FILTER) {
return records;
}
@@ -672,9 +594,11 @@ private:
continue;
}
- if (buffers.check_tombstone(rec.rec)) {
- continue;
- }
+ // FIXME: need to figure out how best to re-enable the buffer tombstone
+ // check in the correct manner.
+ //if (buffview.check_tombstone(rec.rec)) {
+ //continue;
+ //}
if (shid != INVALID_SHID) {
for (size_t lvl=0; lvl<=shid.level_idx; lvl++) {