summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h121
1 files changed, 50 insertions, 71 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 719232e..1886234 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -14,39 +14,36 @@
#include <vector>
#include "framework/interface/Scheduler.h"
+#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "framework/scheduling/FIFOScheduler.h"
#include "framework/scheduling/SerialScheduler.h"
#include "framework/structure/ExtensionStructure.h"
#include "framework/structure/MutableBuffer.h"
#include "framework/scheduling/Epoch.h"
-#include "framework/util/Configuration.h"
+#include "util/types.h"
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
- LayoutPolicy L = LayoutPolicy::TEIRING,
DeletePolicy D = DeletePolicy::TAGGING,
- SchedulerInterface SchedType = SerialScheduler>
+ SchedulerInterface SchedType = de::FIFOScheduler>
class DynamicExtension {
- /* for unit testing purposes */
-public:
- LayoutPolicy Layout = L;
-
private:
/* convenience typedefs for commonly used types within the class */
typedef typename ShardType::RECORD RecordType;
typedef MutableBuffer<RecordType> Buffer;
- typedef ExtensionStructure<ShardType, QueryType, L> Structure;
- typedef Epoch<ShardType, QueryType, L> _Epoch;
+ typedef ExtensionStructure<ShardType, QueryType> Structure;
+ typedef Epoch<ShardType, QueryType> _Epoch;
typedef BufferView<RecordType> BufView;
+ typedef ReconstructionPolicy<ShardType, QueryType> ReconPolicyType;
typedef typename QueryType::Parameters Parameters;
typedef typename QueryType::LocalQuery LocalQuery;
typedef typename QueryType::LocalQueryBuffer BufferQuery;
typedef typename QueryType::LocalResultType LocalResult;
typedef typename QueryType::ResultType QueryResult;
-
static constexpr size_t QUERY = 1;
static constexpr size_t RECONSTRUCTION = 2;
@@ -61,36 +58,32 @@ public:
* Create a new Dynamized version of a data structure, supporting
* inserts and, possibly, deletes. The following parameters are used
* for configuration of the structure,
- * @param buffer_low_watermark The number of records that can be
+ * @param buffer_low_watermark The number of records that can be
* inserted before a buffer flush is initiated
*
- * @param buffer_high_watermark The maximum buffer capacity, inserts
- * will begin to fail once this number is reached, until the
- * buffer flush has completed. Has no effect in single-threaded
+ * @param buffer_high_watermark The maximum buffer capacity, inserts
+ * will begin to fail once this number is reached, until the
+ * buffer flush has completed. Has no effect in single-threaded
* operation
*
- * @param scale_factor The rate at which the capacity of levels
+ * @param scale_factor The rate at which the capacity of levels
* grows; should be at least 2 for reasonable performance
*
* @param memory_budget Unused at this time
*
* @param thread_cnt The maximum number of threads available to the
- * framework's scheduler for use in answering queries and
+ * framework's scheduler for use in answering queries and
* performing compactions and flushes, etc.
*/
- DynamicExtension(size_t buffer_low_watermark, size_t buffer_high_watermark,
- size_t scale_factor, size_t memory_budget = 0,
+ DynamicExtension(ReconPolicyType *recon_policy, size_t buffer_low_watermark,
+ size_t buffer_high_watermark, size_t memory_budget = 0,
size_t thread_cnt = 16)
- : m_scale_factor(scale_factor), m_max_delete_prop(1),
- m_sched(memory_budget, thread_cnt),
+ : m_max_delete_prop(1), m_sched(memory_budget, thread_cnt),
m_buffer(new Buffer(buffer_low_watermark, buffer_high_watermark)),
- m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0) {
- if constexpr (L == LayoutPolicy::BSM) {
- assert(scale_factor == 2);
- }
+ m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0),
+ m_recon_policy(recon_policy) {
- auto vers =
- new Structure(buffer_high_watermark, m_scale_factor, m_max_delete_prop);
+ auto vers = new Structure();
m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0});
m_previous_epoch.store({nullptr, 0});
m_next_epoch.store({nullptr, 0});
@@ -133,26 +126,26 @@ public:
int insert(const RecordType &rec) { return internal_append(rec, false); }
/**
- * Erases a record from the index, according to the DeletePolicy
+ * Erases a record from the index, according to the DeletePolicy
* template parameter. Returns 1 on success and 0 on failure. The
* equality comparison operator of RecordType is used to identify
- * the record to be deleted.
- *
+ * the record to be deleted.
+ *
* Deletes behave differently, depending on the DeletionPolicy. For
* Tombstone deletes, a tombstone record will be inserted into the
* index. The presence of the deleted record is not checked first, so
* deleting a record that does not exist will result in an unnecessary
- * tombstone record being written.
+ * tombstone record being written.
*
* Deletes using Tagging will perform a point lookup for the record to
- * be removed, and mark it as deleted in its header.
+ * be removed, and mark it as deleted in its header.
*
- * @param rec The record to be deleted. The argument to this function
+ * @param rec The record to be deleted. The argument to this function
* should compare equal to the record to be deleted.
*
- * @return 1 on success, and 0 on failure. For tombstone deletes, a
- * failure will occur if the insert fails due to the buffer
- * being full, and can be retried. For tagging deletes, a
+ * @return 1 on success, and 0 on failure. For tombstone deletes, a
+ * failure will occur if the insert fails due to the buffer
+ * being full, and can be retried. For tagging deletes, a
* failure means that hte record to be deleted could not be
* found in the index, and should *not* be retried.
*/
@@ -202,13 +195,12 @@ public:
* @return A future, from which the query results can be retrieved upon
* query completion
*/
- std::future<QueryResult>
- query(Parameters &&parms) {
+ std::future<QueryResult> query(Parameters &&parms) {
return schedule_query(std::move(parms));
}
/**
- * Determine the number of records (including tagged records and
+ * Determine the number of records (including tagged records and
* tombstones) currently within the framework. This number is used for
* determining when and how reconstructions occur.
*
@@ -229,7 +221,7 @@ public:
* but will always return 0 in that case.
*
* @return The number of tombstone records within the index
- */
+ */
size_t get_tombstone_count() {
auto epoch = get_active_epoch();
auto t = epoch->get_buffer().get_tombstone_count() +
@@ -246,7 +238,7 @@ public:
* as each level can contain multiple shards in that case.
*
* @return The number of levels within the index
- */
+ */
size_t get_height() {
auto epoch = get_active_epoch();
auto t = epoch->get_structure()->get_height();
@@ -292,7 +284,7 @@ public:
/**
* Create a new single Shard object containing all of the records
- * within the framework (buffer and shards).
+ * within the framework (buffer and shards).
*
* @param await_reconstruction_completion Specifies whether the currently
* active state of the index should be used to create the shard
@@ -357,11 +349,11 @@ public:
}
/**
- * Verify that the currently active version of the index does not
+ * Verify that the currently active version of the index does not
* violate tombstone proportion invariants. Exposed for unit-testing
* purposes.
*
- * @return Returns true if the tombstone proportion invariant is
+ * @return Returns true if the tombstone proportion invariant is
* satisfied, and false if it is not.
*/
bool validate_tombstone_proportion() {
@@ -378,7 +370,7 @@ public:
void print_scheduler_statistics() const { m_sched.print_statistics(); }
private:
- size_t m_scale_factor;
+ ReconPolicyType const *m_recon_policy;
double m_max_delete_prop;
SchedType m_sched;
@@ -387,7 +379,7 @@ private:
size_t m_core_cnt;
std::atomic<int> m_next_core;
std::atomic<size_t> m_epoch_cnt;
-
+
alignas(64) std::atomic<bool> m_reconstruction_scheduled;
std::atomic<epoch_ptr> m_next_epoch;
@@ -397,9 +389,6 @@ private:
std::condition_variable m_epoch_cv;
std::mutex m_epoch_cv_lk;
-
-
-
void enforce_delete_invariant(_Epoch *epoch) {
auto structure = epoch->get_structure();
auto compactions = structure->get_compaction_tasks();
@@ -407,8 +396,8 @@ private:
while (compactions.size() > 0) {
/* schedule a compaction */
- ReconstructionArgs<ShardType, QueryType, L> *args =
- new ReconstructionArgs<ShardType, QueryType, L>();
+ ReconstructionArgs<ShardType, QueryType> *args =
+ new ReconstructionArgs<ShardType, QueryType>();
args->epoch = epoch;
args->merges = compactions;
args->extension = this;
@@ -555,20 +544,13 @@ private:
}
static void reconstruction(void *arguments) {
- auto args = (ReconstructionArgs<ShardType, QueryType, L> *)arguments;
+ auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments;
((DynamicExtension *)args->extension)->SetThreadAffinity();
Structure *vers = args->epoch->get_structure();
- if constexpr (L == LayoutPolicy::BSM) {
- if (args->merges.size() > 0) {
- vers->reconstruction(args->merges[0]);
- }
- } else {
- for (size_t i = 0; i < args->merges.size(); i++) {
- vers->reconstruction(args->merges[i].target,
- args->merges[i].sources[0]);
- }
+ for (size_t i=0; i<args->tasks.size(); i++) {
+ vers->perform_reconstruction(args->tasks[i]);
}
/*
@@ -607,8 +589,7 @@ private:
}
static void async_query(void *arguments) {
- auto *args =
- (QueryArgs<ShardType, QueryType, DynamicExtension> *) arguments;
+ auto *args = (QueryArgs<ShardType, QueryType, DynamicExtension> *)arguments;
auto epoch = args->extension->get_active_epoch();
@@ -636,7 +617,7 @@ private:
query_results[i] = QueryType::local_query_buffer(buffer_query);
} else { /*execute local queries */
query_results[i] = QueryType::local_query(shards[i - 1].second,
- local_queries[i - 1]);
+ local_queries[i - 1]);
}
/* end query early if EARLY_ABORT is set and a result exists */
@@ -673,11 +654,11 @@ private:
void schedule_reconstruction() {
auto epoch = create_new_epoch();
- ReconstructionArgs<ShardType, QueryType, L> *args =
- new ReconstructionArgs<ShardType, QueryType, L>();
+ ReconstructionArgs<ShardType, QueryType> *args =
+ new ReconstructionArgs<ShardType, QueryType>();
args->epoch = epoch;
- args->merges = epoch->get_structure()->get_reconstruction_tasks(
- m_buffer->get_high_watermark());
+ args->tasks = m_recon_policy->get_reconstruction_tasks(
+ epoch, m_buffer->get_high_watermark());
args->extension = this;
args->compaction = false;
/* NOTE: args is deleted by the reconstruction job, so shouldn't be freed
@@ -686,10 +667,8 @@ private:
m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
}
- std::future<QueryResult>
- schedule_query(Parameters &&query_parms) {
- auto args =
- new QueryArgs<ShardType, QueryType, DynamicExtension>();
+ std::future<QueryResult> schedule_query(Parameters &&query_parms) {
+ auto args = new QueryArgs<ShardType, QueryType, DynamicExtension>();
args->extension = this;
args->query_parms = std::move(query_parms);
auto result = args->result_set.get_future();