summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h1324
1 files changed, 670 insertions, 654 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index e2e2784..16cbb0e 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -1,8 +1,8 @@
/*
* include/framework/DynamicExtension.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -14,766 +14,782 @@
#include <vector>
#include "framework/interface/Scheduler.h"
-#include "framework/scheduling/FIFOScheduler.h"
#include "framework/scheduling/SerialScheduler.h"
-#include "framework/structure/MutableBuffer.h"
-#include "framework/interface/Record.h"
#include "framework/structure/ExtensionStructure.h"
+#include "framework/structure/MutableBuffer.h"
-#include "framework/util/Configuration.h"
#include "framework/scheduling/Epoch.h"
+#include "framework/util/Configuration.h"
namespace de {
-template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L=LayoutPolicy::TEIRING,
- DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=FIFOScheduler>
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
+ LayoutPolicy L = LayoutPolicy::TEIRING,
+ DeletePolicy D = DeletePolicy::TAGGING,
+ SchedulerInterface SchedType = SerialScheduler>
class DynamicExtension {
- typedef S Shard;
- typedef MutableBuffer<R> Buffer;
- typedef ExtensionStructure<R, S, Q, L> Structure;
- typedef Epoch<R, S, Q, L> _Epoch;
- typedef BufferView<R> BufView;
-
- static constexpr size_t QUERY = 1;
- static constexpr size_t RECONSTRUCTION = 2;
-
- struct epoch_ptr {
- _Epoch *epoch;
- size_t refcnt;
- };
-
+ /* for unit testing purposes */
public:
- DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0,
- size_t thread_cnt=16)
- : m_scale_factor(scale_factor)
- , m_max_delete_prop(1)
- , m_sched(memory_budget, thread_cnt)
- , m_buffer(new Buffer(buffer_lwm, buffer_hwm))
- , m_core_cnt(thread_cnt)
- , m_next_core(0)
- , m_epoch_cnt(0)
- {
- if constexpr (L == LayoutPolicy::BSM) {
- assert(scale_factor == 2);
- }
-
- auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop);
- m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0});
- m_previous_epoch.store({nullptr, 0});
- m_next_epoch.store({nullptr, 0});
- }
-
- ~DynamicExtension() {
-
- /* let any in-flight epoch transition finish */
- await_next_epoch();
-
- /* shutdown the scheduler */
- m_sched.shutdown();
-
- /* delete all held resources */
- delete m_next_epoch.load().epoch;
- delete m_current_epoch.load().epoch;
- delete m_previous_epoch.load().epoch;
-
- delete m_buffer;
- }
-
- /*
- * Insert the record `rec` into the index. If the buffer is full and
- * the framework is blocking on an epoch transition, this call may fail
- * and return 0. In this case, retry the call again later. If
- * successful, 1 will be returned. The record will be immediately
- * visible in the buffer upon the successful return of this function.
- */
- int insert(const R &rec) {
- return internal_append(rec, false);
- }
-
- /*
- * Erase the record `rec` from the index. It is assumed that `rec`
- * currently exists--no special checks are made for correctness here.
- * The behavior if this function will differ depending on if tombstone
- * or tagged deletes are used.
- *
- * Tombstone deletes - inserts a tombstone record for `rec`. This *may*
- * return 0 and fail if the buffer is full and the framework is
- * blocking on an epoch transition. In this case, repeat the call
- * later. 1 will be returned when the tombstone is successfully
- * inserted.
- *
- * Tagging deletes - Does a point lookup for the record across the
- * entire structure, and sets its delete bit when found. Returns 1 if
- * the record is found and marked, and 0 if it was not (i.e., if it
- * isn't present in the index).
- */
- int erase(const R &rec) {
- // FIXME: delete tagging will require a lot of extra work to get
- // operating "correctly" in a concurrent environment.
-
- /*
- * Get a view on the buffer *first*. This will ensure a stronger
- * ordering than simply accessing the buffer directly, but is
- * not *strictly* necessary.
- */
- if constexpr (D == DeletePolicy::TAGGING) {
- static_assert(std::same_as<SCHED, SerialScheduler>, "Tagging is only supported in single-threaded operation");
-
- auto view = m_buffer->get_buffer_view();
-
- auto epoch = get_active_epoch();
- if (epoch->get_structure()->tagged_delete(rec)) {
- end_job(epoch);
- return 1;
- }
-
- end_job(epoch);
-
- /*
- * the buffer will take the longest amount of time, and
- * probably has the lowest probability of having the record,
- * so we'll check it last.
- */
- return view.delete_record(rec);
- }
+ LayoutPolicy Layout = L;
- /*
- * If tagging isn't used, then delete using a tombstone
- */
- return internal_append(rec, true);
- }
-
- /*
- * Execute the query with parameters `parms` and return a future. This
- * future can be used to access a vector containing the results of the
- * query.
- *
- * The behavior of this function is undefined if `parms` is not a
- * pointer to a valid query parameter object for the query type used as
- * a template parameter to construct the framework.
- */
- std::future<std::vector<R>> query(void *parms) {
- return schedule_query(parms);
- }
-
- /*
- * Returns the number of records (included tagged records and
- * tombstones) currently within the framework.
- */
- size_t get_record_count() {
- auto epoch = get_active_epoch();
- auto t = epoch->get_buffer().get_record_count() + epoch->get_structure()->get_record_count();
- end_job(epoch);
+private:
+ /* convenience typedefs for commonly used types within the class */
+ typedef typename ShardType::RECORD RecordType;
+ typedef MutableBuffer<RecordType> Buffer;
+ typedef ExtensionStructure<ShardType, QueryType, L> Structure;
+ typedef Epoch<ShardType, QueryType, L> _Epoch;
+ typedef BufferView<RecordType> BufView;
+
+ typedef typename QueryType::Parameters Parameters;
+ typedef typename QueryType::LocalQuery LocalQuery;
+ typedef typename QueryType::LocalQueryBuffer BufferQuery;
+ typedef typename QueryType::LocalResultType LocalResult;
+ typedef typename QueryType::ResultType QueryResult;
+
+
+ static constexpr size_t QUERY = 1;
+ static constexpr size_t RECONSTRUCTION = 2;
+
+ struct epoch_ptr {
+ _Epoch *epoch;
+ size_t refcnt;
+ };
- return t;
+public:
+ /**
+ * Create a new Dynamized version of a data structure, supporting
+ * inserts and, possibly, deletes. The following parameters are used
+ * for configuration of the structure,
+ * @param buffer_low_watermark The number of records that can be
+ * inserted before a buffer flush is initiated
+ *
+ * @param buffer_high_watermark The maximum buffer capacity, inserts
+ * will begin to fail once this number is reached, until the
+ * buffer flush has completed. Has no effect in single-threaded
+ * operation
+ *
+ * @param scale_factor The rate at which the capacity of levels
+ * grows; should be at least 2 for reasonable performance
+ *
+ * @param memory_budget Unused at this time
+ *
+ * @param thread_cnt The maximum number of threads available to the
+ * framework's scheduler for use in answering queries and
+ * performing compactions and flushes, etc.
+ */
+ DynamicExtension(size_t buffer_low_watermark, size_t buffer_high_watermark,
+ size_t scale_factor, size_t memory_budget = 0,
+ size_t thread_cnt = 16)
+ : m_scale_factor(scale_factor), m_max_delete_prop(1),
+ m_sched(memory_budget, thread_cnt),
+ m_buffer(new Buffer(buffer_low_watermark, buffer_high_watermark)),
+ m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0) {
+ if constexpr (L == LayoutPolicy::BSM) {
+ assert(scale_factor == 2);
}
- /*
- * Returns the number of tombstone records currently within the
- * framework. This function can be called when tagged deletes are used,
- * but will always return 0 in that case.
- */
- size_t get_tombstone_count() {
- auto epoch = get_active_epoch();
- auto t = epoch->get_buffer().get_tombstone_count() + epoch->get_structure()->get_tombstone_count();
- end_job(epoch);
-
- return t;
- }
+ auto vers =
+ new Structure(buffer_high_watermark, m_scale_factor, m_max_delete_prop);
+ m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0});
+ m_previous_epoch.store({nullptr, 0});
+ m_next_epoch.store({nullptr, 0});
+ }
+
+ /**
+ * Destructor for DynamicExtension. Will block until the completion of
+ * any outstanding epoch transition, shut down the scheduler, and free
+ * all currently allocated shards, buffers, etc., by calling their
+ * destructors.
+ */
+ ~DynamicExtension() {
+
+ /* let any in-flight epoch transition finish */
+ await_next_epoch();
+
+ /* shutdown the scheduler */
+ m_sched.shutdown();
+
+ /* delete all held resources */
+ delete m_next_epoch.load().epoch;
+ delete m_current_epoch.load().epoch;
+ delete m_previous_epoch.load().epoch;
+
+ delete m_buffer;
+ }
+
+ /**
+ * Inserts a record into the index. Returns 1 if the insert succeeds,
+ * and 0 if it fails. Inserts may fail if the DynamicExtension's buffer
+ * has reached the high water mark; in this case, the insert should be
+ * retried when the buffer has flushed. The record will be immediately
+ * visible inside the index upon the return of this function.
+ *
+ * @param rec The record to be inserted
+ *
+ * @return 1 on success, 0 on failure (in which case the insert should
+ * be retried)
+ */
+ int insert(const RecordType &rec) { return internal_append(rec, false); }
+
+ /**
+ * Erases a record from the index, according to the DeletePolicy
+ * template parameter. Returns 1 on success and 0 on failure. The
+ * equality comparison operator of RecordType is used to identify
+ * the record to be deleted.
+ *
+ * Deletes behave differently, depending on the DeletionPolicy. For
+ * Tombstone deletes, a tombstone record will be inserted into the
+ * index. The presence of the deleted record is not checked first, so
+ * deleting a record that does not exist will result in an unnecessary
+ * tombstone record being written.
+ *
+ * Deletes using Tagging will perform a point lookup for the record to
+ * be removed, and mark it as deleted in its header.
+ *
+ * @param rec The record to be deleted. The argument to this function
+ * should compare equal to the record to be deleted.
+ *
+ * @return 1 on success, and 0 on failure. For tombstone deletes, a
+ * failure will occur if the insert fails due to the buffer
+ * being full, and can be retried. For tagging deletes, a
+ * failure means that hte record to be deleted could not be
+ * found in the index, and should *not* be retried.
+ */
+ int erase(const RecordType &rec) {
+ // FIXME: delete tagging will require a lot of extra work to get
+ // operating "correctly" in a concurrent environment.
/*
- * Get the number of levels within the framework. This count will
- * include any empty levels, but will not include the buffer. Note that
- * this is *not* the same as the number of shards when tiering is used,
- * as each level can contain multiple shards in that case.
+ * Get a view on the buffer *first*. This will ensure a stronger
+ * ordering than simply accessing the buffer directly, but is
+ * not *strictly* necessary.
*/
- size_t get_height() {
- auto epoch = get_active_epoch();
- auto t = epoch->get_structure()->get_height();
- end_job(epoch);
+ if constexpr (D == DeletePolicy::TAGGING) {
+ static_assert(std::same_as<SchedType, SerialScheduler>,
+ "Tagging is only supported in single-threaded operation");
- return t;
- }
+ auto view = m_buffer->get_buffer_view();
- /*
- * Get the number of bytes of memory allocated across the framework for
- * storing records and associated index information (i.e., internal
- * ISAM tree nodes). This includes memory that is allocated but
- * currently unused in the buffer, or in shards themselves
- * (overallocation due to delete cancellation, etc.).
- */
- size_t get_memory_usage() {
- auto epoch = get_active_epoch();
- auto t = m_buffer->get_memory_usage() + epoch->get_structure()->get_memory_usage();
+ auto epoch = get_active_epoch();
+ if (epoch->get_structure()->tagged_delete(rec)) {
end_job(epoch);
+ return 1;
+ }
- return t;
- }
-
- /*
- * Get the number of bytes of memory allocated across the framework for
- * auxiliary structures. This can include bloom filters, aux
- * hashtables, etc.
- */
- size_t get_aux_memory_usage() {
- auto epoch = get_active_epoch();
- auto t = epoch->get_structure()->get_aux_memory_usage();
- end_job(epoch);
+ end_job(epoch);
- return t;
+ /*
+ * the buffer will take the longest amount of time, and
+ * probably has the lowest probability of having the record,
+ * so we'll check it last.
+ */
+ return view.delete_record(rec);
}
/*
- * Returns the maximum physical capacity of the buffer, measured in
- * records.
+ * If tagging isn't used, then delete using a tombstone
*/
- size_t get_buffer_capacity() {
- return m_buffer->get_capacity();
+ return internal_append(rec, true);
+ }
+
+ /**
+ * Schedule the execution of a query with specified parameters and
+ * returns a future that can be used to access the results. The query
+ * is executed asynchronously.
+ * @param parms An rvalue reference to the query parameters.
+ *
+ * @return A future, from which the query results can be retrieved upon
+ * query completion
+ */
+ std::future<std::vector<QueryResult>>
+ query(Parameters &&parms) {
+ return schedule_query(std::move(parms));
+ }
+
+ /**
+ * Determine the number of records (including tagged records and
+ * tombstones) currently within the framework. This number is used for
+ * determining when and how reconstructions occur.
+ *
+ * @return The number of records within the index
+ */
+ size_t get_record_count() {
+ auto epoch = get_active_epoch();
+ auto t = epoch->get_buffer().get_record_count() +
+ epoch->get_structure()->get_record_count();
+ end_job(epoch);
+
+ return t;
+ }
+
+ /**
+ * Returns the number of tombstone records currently within the
+ * index. This function can be called when tagged deletes are used,
+ * but will always return 0 in that case.
+ *
+ * @return The number of tombstone records within the index
+ */
+ size_t get_tombstone_count() {
+ auto epoch = get_active_epoch();
+ auto t = epoch->get_buffer().get_tombstone_count() +
+ epoch->get_structure()->get_tombstone_count();
+ end_job(epoch);
+
+ return t;
+ }
+
+ /**
+ * Get the number of levels within the framework. This count will
+ * include any empty levels, but will not include the buffer. Note that
+ * this is *not* the same as the number of shards when tiering is used,
+ * as each level can contain multiple shards in that case.
+ *
+ * @return The number of levels within the index
+ */
+ size_t get_height() {
+ auto epoch = get_active_epoch();
+ auto t = epoch->get_structure()->get_height();
+ end_job(epoch);
+
+ return t;
+ }
+
+ /**
+ * Get the number of bytes of memory allocated across the framework for
+ * storing records and associated index information (i.e., internal
+ * ISAM tree nodes). This includes memory that is allocated but
+ * currently unused in the buffer, or in shards themselves
+ * (overallocation due to delete cancellation, etc.).
+ *
+ * @return The number of bytes of memory used for shards (as reported by
+ * ShardType::get_memory_usage) and the buffer by the index.
+ */
+ size_t get_memory_usage() {
+ auto epoch = get_active_epoch();
+ auto t = m_buffer->get_memory_usage() +
+ epoch->get_structure()->get_memory_usage();
+ end_job(epoch);
+
+ return t;
+ }
+
+ /**
+ * Get the number of bytes of memory allocated across the framework for
+ * auxiliary structures. This can include bloom filters, aux
+ * hashtables, etc.
+ *
+ * @return The number of bytes of memory used for auxilliary structures
+ * (as reported by ShardType::get_aux_memory_usage) by the index.
+ */
+ size_t get_aux_memory_usage() {
+ auto epoch = get_active_epoch();
+ auto t = epoch->get_structure()->get_aux_memory_usage();
+ end_job(epoch);
+
+ return t;
+ }
+
+ /**
+ * Create a new single Shard object containing all of the records
+ * within the framework (buffer and shards).
+ *
+ * @param await_reconstruction_completion Specifies whether the currently
+ * active state of the index should be used to create the shard
+ * (false), or if shard construction should wait for any active
+ * reconstructions to finish first (true). Default value of false.
+ *
+ * @return A new shard object, containing a copy of all records within
+ * the index. Ownership of this object is transfered to the
+ * caller.
+ */
+ ShardType *
+ create_static_structure(bool await_reconstruction_completion = false) {
+ if (await_reconstruction_completion) {
+ await_next_epoch();
}
-
- /*
- * Create a new single Shard object containing all of the records
- * within the framework (buffer and shards). The optional parameter can
- * be used to specify whether the Shard should be constructed with the
- * currently active state of the framework (false), or if shard
- * construction should wait until any ongoing reconstructions have
- * finished and use that new version (true).
- */
- Shard *create_static_structure(bool await_reconstruction_completion=false) {
- if (await_reconstruction_completion) {
- await_next_epoch();
- }
- auto epoch = get_active_epoch();
- auto vers = epoch->get_structure();
- std::vector<Shard *> shards;
+ auto epoch = get_active_epoch();
+ auto vers = epoch->get_structure();
+ std::vector<ShardType *> shards;
-
- if (vers->get_levels().size() > 0) {
- for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
- if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) {
- shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
- }
- }
- }
-
- /*
- * construct a shard from the buffer view. We'll hold the view
- * for as short a time as possible: once the records are exfiltrated
- * from the buffer, there's no reason to retain a hold on the view's
- * head pointer any longer
- */
- {
- auto bv = epoch->get_buffer();
- if (bv.get_record_count() > 0) {
- shards.emplace_back(new S(std::move(bv)));
- }
- }
-
- Shard *flattened = new S(shards);
-
- for (auto shard : shards) {
- delete shard;
+ if (vers->get_levels().size() > 0) {
+ for (int i = vers->get_levels().size() - 1; i >= 0; i--) {
+ if (vers->get_levels()[i] &&
+ vers->get_levels()[i]->get_record_count() > 0) {
+ shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
}
-
- end_job(epoch);
- return flattened;
+ }
}
/*
- * If the current epoch is *not* the newest one, then wait for
- * the newest one to become available. Otherwise, returns immediately.
+ * construct a shard from the buffer view. We'll hold the view
+ * for as short a time as possible: once the records are exfiltrated
+ * from the buffer, there's no reason to retain a hold on the view's
+ * head pointer any longer
*/
- void await_next_epoch() {
- while (m_next_epoch.load().epoch != nullptr) {
- std::unique_lock<std::mutex> lk(m_epoch_cv_lk);
- m_epoch_cv.wait(lk);
- }
+ {
+ auto bv = epoch->get_buffer();
+ if (bv.get_record_count() > 0) {
+ shards.emplace_back(new ShardType(std::move(bv)));
+ }
}
- /*
- * Mostly exposed for unit-testing purposes. Verifies that the current
- * active version of the ExtensionStructure doesn't violate the maximum
- * tombstone proportion invariant.
- */
- bool validate_tombstone_proportion() {
- auto epoch = get_active_epoch();
- auto t = epoch->get_structure()->validate_tombstone_proportion();
- end_job(epoch);
- return t;
- }
+ ShardType *flattened = new ShardType(shards);
+ for (auto shard : shards) {
+ delete shard;
+ }
- void print_scheduler_statistics() {
- m_sched.print_statistics();
+ end_job(epoch);
+ return flattened;
+ }
+
+ /*
+ * If the current epoch is *not* the newest one, then wait for
+ * the newest one to become available. Otherwise, returns immediately.
+ */
+ void await_next_epoch() {
+ while (m_next_epoch.load().epoch != nullptr) {
+ std::unique_lock<std::mutex> lk(m_epoch_cv_lk);
+ m_epoch_cv.wait(lk);
}
+ }
+
+ /**
+ * Verify that the currently active version of the index does not
+ * violate tombstone proportion invariants. Exposed for unit-testing
+ * purposes.
+ *
+ * @return Returns true if the tombstone proportion invariant is
+ * satisfied, and false if it is not.
+ */
+ bool validate_tombstone_proportion() {
+ auto epoch = get_active_epoch();
+ auto t = epoch->get_structure()->validate_tombstone_proportion();
+ end_job(epoch);
+ return t;
+ }
+
+ /**
+ * Calls SchedType::print_statistics, which should write a report of
+ * scheduler performance statistics to stdout.
+ */
+ void print_scheduler_statistics() const { m_sched.print_statistics(); }
private:
- SCHED m_sched;
-
- Buffer *m_buffer;
+ size_t m_scale_factor;
+ double m_max_delete_prop;
- //std::mutex m_struct_lock;
- //std::set<Structure *> m_versions;
+ SchedType m_sched;
+ Buffer *m_buffer;
- alignas(64) std::atomic<bool> m_reconstruction_scheduled;
+ size_t m_core_cnt;
+ std::atomic<int> m_next_core;
+ std::atomic<size_t> m_epoch_cnt;
+
+ alignas(64) std::atomic<bool> m_reconstruction_scheduled;
- std::atomic<epoch_ptr> m_next_epoch;
- std::atomic<epoch_ptr> m_current_epoch;
- std::atomic<epoch_ptr> m_previous_epoch;
+ std::atomic<epoch_ptr> m_next_epoch;
+ std::atomic<epoch_ptr> m_current_epoch;
+ std::atomic<epoch_ptr> m_previous_epoch;
- std::condition_variable m_epoch_cv;
- std::mutex m_epoch_cv_lk;
+ std::condition_variable m_epoch_cv;
+ std::mutex m_epoch_cv_lk;
- std::atomic<size_t> m_epoch_cnt;
- size_t m_scale_factor;
- double m_max_delete_prop;
- std::atomic<int> m_next_core;
- size_t m_core_cnt;
- void enforce_delete_invariant(_Epoch *epoch) {
- auto structure = epoch->get_structure();
- auto compactions = structure->get_compaction_tasks();
+ void enforce_delete_invariant(_Epoch *epoch) {
+ auto structure = epoch->get_structure();
+ auto compactions = structure->get_compaction_tasks();
- while (compactions.size() > 0) {
+ while (compactions.size() > 0) {
- /* schedule a compaction */
- ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>();
- args->epoch = epoch;
- args->merges = compactions;
- args->extension = this;
- args->compaction = true;
- /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */
+ /* schedule a compaction */
+ ReconstructionArgs<ShardType, QueryType, L> *args =
+ new ReconstructionArgs<ShardType, QueryType, L>();
+ args->epoch = epoch;
+ args->merges = compactions;
+ args->extension = this;
+ args->compaction = true;
+ /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed
+ * here */
- auto wait = args->result.get_future();
+ auto wait = args->result.get_future();
- m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
+ m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
- /* wait for compaction completion */
- wait.get();
-
- /* get a new batch of compactions to perform, if needed */
- compactions = structure->get_compaction_tasks();
- }
- }
+ /* wait for compaction completion */
+ wait.get();
- _Epoch *get_active_epoch() {
- epoch_ptr old, new_ptr;
-
- do {
- /*
- * during an epoch transition, a nullptr will installed in the
- * current_epoch. At this moment, the "new" current epoch will
- * soon be installed, but the "current" current epoch has been
- * moved back to m_previous_epoch.
- */
- if (m_current_epoch.load().epoch == nullptr) {
- old = m_previous_epoch;
- new_ptr = {old.epoch, old.refcnt+1};
- if (old.epoch != nullptr && m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
- break;
- }
- } else {
- old = m_current_epoch;
- new_ptr = {old.epoch, old.refcnt+1};
- if (old.epoch != nullptr && m_current_epoch.compare_exchange_strong(old, new_ptr)) {
- break;
- }
- }
- } while (true);
-
- assert(new_ptr.refcnt > 0);
-
- return new_ptr.epoch;
+ /* get a new batch of compactions to perform, if needed */
+ compactions = structure->get_compaction_tasks();
}
+ }
+
+ _Epoch *get_active_epoch() {
+ epoch_ptr old, new_ptr;
+
+ do {
+ /*
+ * during an epoch transition, a nullptr will installed in the
+ * current_epoch. At this moment, the "new" current epoch will
+ * soon be installed, but the "current" current epoch has been
+ * moved back to m_previous_epoch.
+ */
+ if (m_current_epoch.load().epoch == nullptr) {
+ old = m_previous_epoch;
+ new_ptr = {old.epoch, old.refcnt + 1};
+ if (old.epoch != nullptr &&
+ m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
+ break;
+ }
+ } else {
+ old = m_current_epoch;
+ new_ptr = {old.epoch, old.refcnt + 1};
+ if (old.epoch != nullptr &&
+ m_current_epoch.compare_exchange_strong(old, new_ptr)) {
+ break;
+ }
+ }
+ } while (true);
- void advance_epoch(size_t buffer_head) {
+ assert(new_ptr.refcnt > 0);
- retire_epoch(m_previous_epoch.load().epoch);
+ return new_ptr.epoch;
+ }
- epoch_ptr tmp = {nullptr, 0};
- epoch_ptr cur;
- do {
- cur = m_current_epoch;
- } while(!m_current_epoch.compare_exchange_strong(cur, tmp));
+ void advance_epoch(size_t buffer_head) {
- m_previous_epoch.store(cur);
+ retire_epoch(m_previous_epoch.load().epoch);
- // FIXME: this may currently block because there isn't any
- // query preemption yet. At this point, we'd need to either
- // 1) wait for all queries on the old_head to finish
- // 2) kill all queries on the old_head
- // 3) somehow migrate all queries on the old_head to the new
- // version
- while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) {
- _mm_pause();
- }
+ epoch_ptr tmp = {nullptr, 0};
+ epoch_ptr cur;
+ do {
+ cur = m_current_epoch;
+ } while (!m_current_epoch.compare_exchange_strong(cur, tmp));
+ m_previous_epoch.store(cur);
- m_current_epoch.store(m_next_epoch);
- m_next_epoch.store({nullptr, 0});
+ // FIXME: this may currently block because there isn't any
+ // query preemption yet. At this point, we'd need to either
+ // 1) wait for all queries on the old_head to finish
+ // 2) kill all queries on the old_head
+ // 3) somehow migrate all queries on the old_head to the new
+ // version
+ while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) {
+ _mm_pause();
+ }
+ m_current_epoch.store(m_next_epoch);
+ m_next_epoch.store({nullptr, 0});
- /* notify any blocking threads that the new epoch is available */
- m_epoch_cv_lk.lock();
- m_epoch_cv.notify_all();
- m_epoch_cv_lk.unlock();
- }
+ /* notify any blocking threads that the new epoch is available */
+ m_epoch_cv_lk.lock();
+ m_epoch_cv.notify_all();
+ m_epoch_cv_lk.unlock();
+ }
+ /*
+ * Creates a new epoch by copying the currently active one. The new epoch's
+ * structure will be a shallow copy of the old one's.
+ */
+ _Epoch *create_new_epoch() {
/*
- * Creates a new epoch by copying the currently active one. The new epoch's
- * structure will be a shallow copy of the old one's.
+ * This epoch access is _not_ protected under the assumption that
+ * only one reconstruction will be able to trigger at a time. If that
+ * condition is violated, it is possible that this code will clone a retired
+ * epoch.
*/
- _Epoch *create_new_epoch() {
- /*
- * This epoch access is _not_ protected under the assumption that
- * only one reconstruction will be able to trigger at a time. If that condition
- * is violated, it is possible that this code will clone a retired
- * epoch.
- */
- assert(m_next_epoch.load().epoch == nullptr);
- auto current_epoch = get_active_epoch();
+ assert(m_next_epoch.load().epoch == nullptr);
+ auto current_epoch = get_active_epoch();
- m_epoch_cnt.fetch_add(1);
- m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0});
+ m_epoch_cnt.fetch_add(1);
+ m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0});
- end_job(current_epoch);
+ end_job(current_epoch);
- return m_next_epoch.load().epoch;
- }
+ return m_next_epoch.load().epoch;
+ }
- void retire_epoch(_Epoch *epoch) {
- /*
- * Epochs with currently active jobs cannot
- * be retired. By the time retire_epoch is called,
- * it is assumed that a new epoch is active, meaning
- * that the epoch to be retired should no longer
- * accumulate new active jobs. Eventually, this
- * number will hit zero and the function will
- * proceed.
- */
-
- if (epoch == nullptr) {
- return;
- }
+ void retire_epoch(_Epoch *epoch) {
+ /*
+ * Epochs with currently active jobs cannot
+ * be retired. By the time retire_epoch is called,
+ * it is assumed that a new epoch is active, meaning
+ * that the epoch to be retired should no longer
+ * accumulate new active jobs. Eventually, this
+ * number will hit zero and the function will
+ * proceed.
+ */
- epoch_ptr old, new_ptr;
- new_ptr = {nullptr, 0};
- do {
- old = m_previous_epoch.load();
-
- /*
- * If running in single threaded mode, the failure to retire
- * an Epoch will result in the thread of execution blocking
- * indefinitely.
- */
- if constexpr (std::same_as<SCHED, SerialScheduler>) {
- if (old.epoch == epoch) assert(old.refcnt == 0);
- }
-
- if (old.epoch == epoch && old.refcnt == 0 &&
- m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
- break;
- }
- usleep(1);
-
- } while(true);
-
- delete epoch;
+ if (epoch == nullptr) {
+ return;
}
- static void reconstruction(void *arguments) {
- auto args = (ReconstructionArgs<R, S, Q, L> *) arguments;
+ epoch_ptr old, new_ptr;
+ new_ptr = {nullptr, 0};
+ do {
+ old = m_previous_epoch.load();
+
+ /*
+ * If running in single threaded mode, the failure to retire
+ * an Epoch will result in the thread of execution blocking
+ * indefinitely.
+ */
+ if constexpr (std::same_as<SchedType, SerialScheduler>) {
+ if (old.epoch == epoch)
+ assert(old.refcnt == 0);
+ }
+
+ if (old.epoch == epoch && old.refcnt == 0 &&
+ m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
+ break;
+ }
+ usleep(1);
+
+ } while (true);
+
+ delete epoch;
+ }
+
+ static void reconstruction(void *arguments) {
+ auto args = (ReconstructionArgs<ShardType, QueryType, L> *)arguments;
+
+ ((DynamicExtension *)args->extension)->SetThreadAffinity();
+ Structure *vers = args->epoch->get_structure();
+
+ if constexpr (L == LayoutPolicy::BSM) {
+ if (args->merges.size() > 0) {
+ vers->reconstruction(args->merges[0]);
+ }
+ } else {
+ for (ssize_t i = 0; i < args->merges.size(); i++) {
+ vers->reconstruction(args->merges[i].target,
+ args->merges[i].sources[0]);
+ }
+ }
- ((DynamicExtension *) args->extension)->SetThreadAffinity();
- Structure *vers = args->epoch->get_structure();
+ /*
+ * we'll grab the buffer AFTER doing the internal reconstruction, so we
+ * can flush as many records as possible in one go. The reconstruction
+ * was done so as to make room for the full buffer anyway, so there's
+ * no real benefit to doing this first.
+ */
+ auto buffer_view = args->epoch->get_buffer();
+ size_t new_head = buffer_view.get_tail();
- if constexpr (L == LayoutPolicy::BSM) {
- if (args->merges.size() > 0) {
- vers->reconstruction(args->merges[0]);
- }
- } else {
- for (ssize_t i=0; i<args->merges.size(); i++) {
- vers->reconstruction(args->merges[i].target, args->merges[i].sources[0]);
- }
- }
+ /*
+ * if performing a compaction, don't flush the buffer, as
+ * there is no guarantee that any necessary reconstructions
+ * will free sufficient space in L0 to support a flush
+ */
+ if (!args->compaction) {
+ vers->flush_buffer(std::move(buffer_view));
+ }
+ args->result.set_value(true);
- /*
- * we'll grab the buffer AFTER doing the internal reconstruction, so we
- * can flush as many records as possible in one go. The reconstruction
- * was done so as to make room for the full buffer anyway, so there's
- * no real benefit to doing this first.
- */
- auto buffer_view = args->epoch->get_buffer();
- size_t new_head = buffer_view.get_tail();
+ /*
+ * Compactions occur on an epoch _before_ it becomes active,
+ * and as a result the active epoch should _not_ be advanced as
+ * part of a compaction
+ */
+ if (!args->compaction) {
+ ((DynamicExtension *)args->extension)->advance_epoch(new_head);
+ }
- /*
- * if performing a compaction, don't flush the buffer, as
- * there is no guarantee that any necessary reconstructions
- * will free sufficient space in L0 to support a flush
- */
- if (!args->compaction) {
- vers->flush_buffer(std::move(buffer_view));
+ ((DynamicExtension *)args->extension)
+ ->m_reconstruction_scheduled.store(false);
+
+ delete args;
+ }
+
+ static void async_query(void *arguments) {
+ auto *args =
+ (QueryArgs<ShardType, QueryType, DynamicExtension> *) arguments;
+
+ auto epoch = args->extension->get_active_epoch();
+
+ auto buffer = epoch->get_buffer();
+ auto vers = epoch->get_structure();
+ auto *parms = &(args->query_parms);
+
+ /* create initial buffer query */
+ auto buffer_query = QueryType::local_preproc_buffer(&buffer, parms);
+
+ /* create initial local queries */
+ std::vector<std::pair<ShardID, ShardType *>> shards;
+ std::vector<LocalQuery *> local_queries =
+ vers->get_local_queries(shards, parms);
+
+ /* process local/buffer queries to create the final version */
+ QueryType::distribute_query(parms, local_queries, buffer_query);
+
+ /* execute the local/buffer queries and combine the results into output */
+ std::vector<QueryResult> output;
+ do {
+ std::vector<std::vector<LocalResult>>
+ query_results(shards.size() + 1);
+ for (size_t i = 0; i < query_results.size(); i++) {
+ std::vector<LocalResult> local_results;
+ ShardID shid;
+
+ if (i == 0) { /* execute buffer query */
+ local_results = QueryType::local_query_buffer(buffer_query);
+ shid = INVALID_SHID;
+ } else { /*execute local queries */
+ local_results = QueryType::local_query(shards[i - 1].second,
+ local_queries[i - 1]);
+ shid = shards[i - 1].first;
}
- args->result.set_value(true);
+ /* framework-level, automatic delete filtering */
+ query_results[i] = std::move(local_results);
- /*
- * Compactions occur on an epoch _before_ it becomes active,
- * and as a result the active epoch should _not_ be advanced as
- * part of a compaction
- */
- if (!args->compaction) {
- ((DynamicExtension *) args->extension)->advance_epoch(new_head);
+ /* end query early if EARLY_ABORT is set and a result exists */
+ if constexpr (QueryType::EARLY_ABORT) {
+ if (query_results[i].size() > 0)
+ break;
}
+ }
- ((DynamicExtension *) args->extension)->m_reconstruction_scheduled.store(false);
-
- delete args;
- }
-
- static void async_query(void *arguments) {
- QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments;
-
- auto epoch = ((DynamicExtension *) args->extension)->get_active_epoch();
-
- auto ptr1 = ((DynamicExtension *) args->extension)->m_previous_epoch.load().epoch;
- auto ptr2 = ((DynamicExtension *) args->extension)->m_current_epoch.load().epoch;
- auto ptr3 = ((DynamicExtension *) args->extension)->m_next_epoch.load().epoch;
-
-
- auto buffer = epoch->get_buffer();
- auto vers = epoch->get_structure();
- void *parms = args->query_parms;
-
- /* Get the buffer query states */
- void *buffer_state = Q::get_buffer_query_state(&buffer, parms);
-
- /* Get the shard query states */
- std::vector<std::pair<ShardID, Shard*>> shards;
- std::vector<void *> states = vers->get_query_states(shards, parms);
+ /*
+ * combine the results of the local queries, also translating
+ * from LocalResultType to ResultType
+ */
+ QueryType::combine(query_results, parms, output);
- std::vector<R> results;
- Q::process_query_states(parms, states, buffer_state);
+ /* optionally repeat the local queries if necessary */
+ } while (QueryType::repeat(parms, output, local_queries, buffer_query));
- do {
- std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
- for (size_t i=0; i<query_results.size(); i++) {
- std::vector<Wrapped<R>> local_results;
- ShardID shid;
+ /* return the output vector to caller via the future */
+ args->result_set.set_value(std::move(output));
- if (i == 0) { /* process the buffer first */
- local_results = Q::buffer_query(buffer_state, parms);
- shid = INVALID_SHID;
- } else {
- local_results = Q::query(shards[i - 1].second, states[i - 1], parms);
- shid = shards[i - 1].first;
- }
+ /* officially end the query job, releasing the pin on the epoch */
+ args->extension->end_job(epoch);
- query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer));
-
- if constexpr (Q::EARLY_ABORT) {
- if (query_results[i].size() > 0) break;
- }
- }
- Q::merge(query_results, parms, results);
-
- } while (Q::repeat(parms, results, states, buffer_state));
-
- args->result_set.set_value(std::move(results));
-
- ((DynamicExtension *) args->extension)->end_job(epoch);
-
- Q::delete_buffer_query_state(buffer_state);
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(states[i]);
- }
-
- delete args;
+ /* clean up memory allocated for temporary query objects */
+ delete buffer_query;
+ for (size_t i = 0; i < local_queries.size(); i++) {
+ delete local_queries[i];
}
- void schedule_reconstruction() {
- auto epoch = create_new_epoch();
- /*
- * the reconstruction process calls end_job(),
- * so we must start one before calling it
- */
-
- ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>();
- args->epoch = epoch;
- args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_high_watermark());
- args->extension = this;
- args->compaction = false;
- /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */
-
- m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
+ delete args;
+ }
+
+ void schedule_reconstruction() {
+ auto epoch = create_new_epoch();
+
+ ReconstructionArgs<ShardType, QueryType, L> *args =
+ new ReconstructionArgs<ShardType, QueryType, L>();
+ args->epoch = epoch;
+ args->merges = epoch->get_structure()->get_reconstruction_tasks(
+ m_buffer->get_high_watermark());
+ args->extension = this;
+ args->compaction = false;
+ /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed
+ * here */
+
+ m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
+ }
+
+ std::future<std::vector<QueryResult>>
+ schedule_query(Parameters &&query_parms) {
+ auto args =
+ new QueryArgs<ShardType, QueryType, DynamicExtension>();
+ args->extension = this;
+ args->query_parms = std::move(query_parms);
+ auto result = args->result_set.get_future();
+
+ m_sched.schedule_job(async_query, 0, (void *)args, QUERY);
+
+ return result;
+ }
+
+ int internal_append(const RecordType &rec, bool ts) {
+ if (m_buffer->is_at_low_watermark()) {
+ auto old = false;
+
+ if (m_reconstruction_scheduled.compare_exchange_strong(old, true)) {
+ schedule_reconstruction();
+ }
}
- std::future<std::vector<R>> schedule_query(void *query_parms) {
- QueryArgs<R, S, Q, L> *args = new QueryArgs<R, S, Q, L>();
- args->extension = this;
- args->query_parms = query_parms;
- auto result = args->result_set.get_future();
+ /* this will fail if the HWM is reached and return 0 */
+ return m_buffer->append(rec, ts);
+ }
- m_sched.schedule_job(async_query, 0, args, QUERY);
-
- return result;
+#ifdef _GNU_SOURCE
+ void SetThreadAffinity() {
+ if constexpr (std::same_as<SchedType, SerialScheduler>) {
+ return;
}
- int internal_append(const R &rec, bool ts) {
- if (m_buffer->is_at_low_watermark()) {
- auto old = false;
-
- if (m_reconstruction_scheduled.compare_exchange_strong(old, true)) {
- schedule_reconstruction();
- }
- }
-
- /* this will fail if the HWM is reached and return 0 */
- return m_buffer->append(rec, ts);
+ int core = m_next_core.fetch_add(1) % m_core_cnt;
+ cpu_set_t mask;
+ CPU_ZERO(&mask);
+
+ switch (core % 2) {
+ case 0:
+ // 0 |-> 0
+ // 2 |-> 2
+ // 4 |-> 4
+ core = core + 0;
+ break;
+ case 1:
+ // 1 |-> 28
+ // 3 |-> 30
+ // 5 |-> 32
+ core = (core - 1) + m_core_cnt;
+ break;
}
+ CPU_SET(core, &mask);
+ ::sched_setaffinity(0, sizeof(mask), &mask);
+ }
+#else
+ void SetThreadAffinity() {}
+#endif
- static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) {
- if constexpr (Q::SKIP_DELETE_FILTER) {
- return std::move(records);
- }
-
- std::vector<Wrapped<R>> processed_records;
- processed_records.reserve(records.size());
+ void end_job(_Epoch *epoch) {
+ epoch_ptr old, new_ptr;
- /*
- * For delete tagging, we just need to check the delete bit
- * on each record.
+ do {
+ if (m_previous_epoch.load().epoch == epoch) {
+ old = m_previous_epoch;
+ /*
+ * This could happen if we get into the system during a
+ * transition. In this case, we can just back out and retry
*/
- if constexpr (D == DeletePolicy::TAGGING) {
- for (auto &rec : records) {
- if (rec.is_deleted()) {
- continue;
- }
+ if (old.epoch == nullptr) {
+ continue;
+ }
- processed_records.emplace_back(rec);
- }
+ assert(old.refcnt > 0);
- return processed_records;
+ new_ptr = {old.epoch, old.refcnt - 1};
+ if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
+ break;
}
-
+ } else {
+ old = m_current_epoch;
/*
- * For tombstone deletes, we need to search for the corresponding
- * tombstone for each record.
+ * This could happen if we get into the system during a
+ * transition. In this case, we can just back out and retry
*/
- for (auto &rec : records) {
- if (rec.is_tombstone()) {
- continue;
- }
-
- // FIXME: need to figure out how best to re-enable the buffer tombstone
- // check in the correct manner.
- //if (buffview.check_tombstone(rec.rec)) {
- //continue;
- //}
-
- for (size_t i=0; i<bview->get_record_count(); i++) {
- if (bview->get(i)->is_tombstone() && bview->get(i)->rec == rec.rec) {
- continue;
- }
- }
-
- if (shid != INVALID_SHID) {
- for (size_t lvl=0; lvl<=shid.level_idx; lvl++) {
- if (vers->get_levels()[lvl]->check_tombstone(0, rec.rec)) {
- continue;
- }
- }
-
- if (vers->get_levels()[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) {
- continue;
- }
- }
-
- processed_records.emplace_back(rec);
- }
-
- return processed_records;
- }
-
-#ifdef _GNU_SOURCE
- void SetThreadAffinity() {
- if constexpr (std::same_as<SCHED, SerialScheduler>) {
- return;
+ if (old.epoch == nullptr) {
+ continue;
}
- int core = m_next_core.fetch_add(1) % m_core_cnt;
- cpu_set_t mask;
- CPU_ZERO(&mask);
+ assert(old.refcnt > 0);
- switch (core % 2) {
- case 0:
- // 0 |-> 0
- // 2 |-> 2
- // 4 |-> 4
- core = core;
- break;
- case 1:
- // 1 |-> 28
- // 3 |-> 30
- // 5 |-> 32
- core = (core - 1) + m_core_cnt;
+ new_ptr = {old.epoch, old.refcnt - 1};
+ if (m_current_epoch.compare_exchange_strong(old, new_ptr)) {
break;
}
- CPU_SET(core, &mask);
- ::sched_setaffinity(0, sizeof(mask), &mask);
- }
-#else
- void SetThreadAffinity() {
-
- }
-#endif
-
-
- void end_job(_Epoch *epoch) {
- epoch_ptr old, new_ptr;
-
- do {
- if (m_previous_epoch.load().epoch == epoch) {
- old = m_previous_epoch;
- /*
- * This could happen if we get into the system during a
- * transition. In this case, we can just back out and retry
- */
- if (old.epoch == nullptr) {
- continue;
- }
-
- assert(old.refcnt > 0);
-
- new_ptr = {old.epoch, old.refcnt - 1};
- if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
- break;
- }
- } else {
- old = m_current_epoch;
- /*
- * This could happen if we get into the system during a
- * transition. In this case, we can just back out and retry
- */
- if (old.epoch == nullptr) {
- continue;
- }
-
- assert(old.refcnt > 0);
-
- new_ptr = {old.epoch, old.refcnt - 1};
- if (m_current_epoch.compare_exchange_strong(old, new_ptr)) {
- break;
- }
- }
- } while (true);
- }
-
+ }
+ } while (true);
+ }
};
-}
-
+} // namespace de