summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-12-22 13:00:19 -0500
committerDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-12-22 13:00:19 -0500
commit5617bed5257506d3dfda8537b16f44b3e40f1b42 (patch)
treeb1a4bb957929b20c884a4eed070f42065828fdb6 /include/framework
parent9876d74e503df64eb9e82e540ca41fcf593ebf64 (diff)
downloaddynamic-extension-5617bed5257506d3dfda8537b16f44b3e40f1b42.tar.gz
Began overhauling reconstruction mechanism
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h121
-rw-r--r--include/framework/reconstruction/BSMPolicy.h83
-rw-r--r--include/framework/reconstruction/LevelingPolicy.h84
-rw-r--r--include/framework/reconstruction/ReconstructionPolicy.h30
-rw-r--r--include/framework/reconstruction/TieringPolicy.h79
-rw-r--r--include/framework/scheduling/Epoch.h6
-rw-r--r--include/framework/scheduling/Task.h9
-rw-r--r--include/framework/structure/ExtensionStructure.h561
-rw-r--r--include/framework/structure/InternalLevel.h154
-rw-r--r--include/framework/util/Configuration.h14
10 files changed, 434 insertions, 707 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 719232e..1886234 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -14,39 +14,36 @@
#include <vector>
#include "framework/interface/Scheduler.h"
+#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "framework/scheduling/FIFOScheduler.h"
#include "framework/scheduling/SerialScheduler.h"
#include "framework/structure/ExtensionStructure.h"
#include "framework/structure/MutableBuffer.h"
#include "framework/scheduling/Epoch.h"
-#include "framework/util/Configuration.h"
+#include "util/types.h"
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
- LayoutPolicy L = LayoutPolicy::TEIRING,
DeletePolicy D = DeletePolicy::TAGGING,
- SchedulerInterface SchedType = SerialScheduler>
+ SchedulerInterface SchedType = de::FIFOScheduler>
class DynamicExtension {
- /* for unit testing purposes */
-public:
- LayoutPolicy Layout = L;
-
private:
/* convenience typedefs for commonly used types within the class */
typedef typename ShardType::RECORD RecordType;
typedef MutableBuffer<RecordType> Buffer;
- typedef ExtensionStructure<ShardType, QueryType, L> Structure;
- typedef Epoch<ShardType, QueryType, L> _Epoch;
+ typedef ExtensionStructure<ShardType, QueryType> Structure;
+ typedef Epoch<ShardType, QueryType> _Epoch;
typedef BufferView<RecordType> BufView;
+ typedef ReconstructionPolicy<ShardType, QueryType> ReconPolicyType;
typedef typename QueryType::Parameters Parameters;
typedef typename QueryType::LocalQuery LocalQuery;
typedef typename QueryType::LocalQueryBuffer BufferQuery;
typedef typename QueryType::LocalResultType LocalResult;
typedef typename QueryType::ResultType QueryResult;
-
static constexpr size_t QUERY = 1;
static constexpr size_t RECONSTRUCTION = 2;
@@ -61,36 +58,32 @@ public:
* Create a new Dynamized version of a data structure, supporting
* inserts and, possibly, deletes. The following parameters are used
* for configuration of the structure,
- * @param buffer_low_watermark The number of records that can be
+ * @param buffer_low_watermark The number of records that can be
* inserted before a buffer flush is initiated
*
- * @param buffer_high_watermark The maximum buffer capacity, inserts
- * will begin to fail once this number is reached, until the
- * buffer flush has completed. Has no effect in single-threaded
+ * @param buffer_high_watermark The maximum buffer capacity, inserts
+ * will begin to fail once this number is reached, until the
+ * buffer flush has completed. Has no effect in single-threaded
* operation
*
- * @param scale_factor The rate at which the capacity of levels
+ * @param scale_factor The rate at which the capacity of levels
* grows; should be at least 2 for reasonable performance
*
* @param memory_budget Unused at this time
*
* @param thread_cnt The maximum number of threads available to the
- * framework's scheduler for use in answering queries and
+ * framework's scheduler for use in answering queries and
* performing compactions and flushes, etc.
*/
- DynamicExtension(size_t buffer_low_watermark, size_t buffer_high_watermark,
- size_t scale_factor, size_t memory_budget = 0,
+ DynamicExtension(ReconPolicyType *recon_policy, size_t buffer_low_watermark,
+ size_t buffer_high_watermark, size_t memory_budget = 0,
size_t thread_cnt = 16)
- : m_scale_factor(scale_factor), m_max_delete_prop(1),
- m_sched(memory_budget, thread_cnt),
+ : m_max_delete_prop(1), m_sched(memory_budget, thread_cnt),
m_buffer(new Buffer(buffer_low_watermark, buffer_high_watermark)),
- m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0) {
- if constexpr (L == LayoutPolicy::BSM) {
- assert(scale_factor == 2);
- }
+ m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0),
+ m_recon_policy(recon_policy) {
- auto vers =
- new Structure(buffer_high_watermark, m_scale_factor, m_max_delete_prop);
+ auto vers = new Structure();
m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0});
m_previous_epoch.store({nullptr, 0});
m_next_epoch.store({nullptr, 0});
@@ -133,26 +126,26 @@ public:
int insert(const RecordType &rec) { return internal_append(rec, false); }
/**
- * Erases a record from the index, according to the DeletePolicy
+ * Erases a record from the index, according to the DeletePolicy
* template parameter. Returns 1 on success and 0 on failure. The
* equality comparison operator of RecordType is used to identify
- * the record to be deleted.
- *
+ * the record to be deleted.
+ *
* Deletes behave differently, depending on the DeletionPolicy. For
* Tombstone deletes, a tombstone record will be inserted into the
* index. The presence of the deleted record is not checked first, so
* deleting a record that does not exist will result in an unnecessary
- * tombstone record being written.
+ * tombstone record being written.
*
* Deletes using Tagging will perform a point lookup for the record to
- * be removed, and mark it as deleted in its header.
+ * be removed, and mark it as deleted in its header.
*
- * @param rec The record to be deleted. The argument to this function
+ * @param rec The record to be deleted. The argument to this function
* should compare equal to the record to be deleted.
*
- * @return 1 on success, and 0 on failure. For tombstone deletes, a
- * failure will occur if the insert fails due to the buffer
- * being full, and can be retried. For tagging deletes, a
+ * @return 1 on success, and 0 on failure. For tombstone deletes, a
+ * failure will occur if the insert fails due to the buffer
+ * being full, and can be retried. For tagging deletes, a
* failure means that hte record to be deleted could not be
* found in the index, and should *not* be retried.
*/
@@ -202,13 +195,12 @@ public:
* @return A future, from which the query results can be retrieved upon
* query completion
*/
- std::future<QueryResult>
- query(Parameters &&parms) {
+ std::future<QueryResult> query(Parameters &&parms) {
return schedule_query(std::move(parms));
}
/**
- * Determine the number of records (including tagged records and
+ * Determine the number of records (including tagged records and
* tombstones) currently within the framework. This number is used for
* determining when and how reconstructions occur.
*
@@ -229,7 +221,7 @@ public:
* but will always return 0 in that case.
*
* @return The number of tombstone records within the index
- */
+ */
size_t get_tombstone_count() {
auto epoch = get_active_epoch();
auto t = epoch->get_buffer().get_tombstone_count() +
@@ -246,7 +238,7 @@ public:
* as each level can contain multiple shards in that case.
*
* @return The number of levels within the index
- */
+ */
size_t get_height() {
auto epoch = get_active_epoch();
auto t = epoch->get_structure()->get_height();
@@ -292,7 +284,7 @@ public:
/**
* Create a new single Shard object containing all of the records
- * within the framework (buffer and shards).
+ * within the framework (buffer and shards).
*
* @param await_reconstruction_completion Specifies whether the currently
* active state of the index should be used to create the shard
@@ -357,11 +349,11 @@ public:
}
/**
- * Verify that the currently active version of the index does not
+ * Verify that the currently active version of the index does not
* violate tombstone proportion invariants. Exposed for unit-testing
* purposes.
*
- * @return Returns true if the tombstone proportion invariant is
+ * @return Returns true if the tombstone proportion invariant is
* satisfied, and false if it is not.
*/
bool validate_tombstone_proportion() {
@@ -378,7 +370,7 @@ public:
void print_scheduler_statistics() const { m_sched.print_statistics(); }
private:
- size_t m_scale_factor;
+ ReconPolicyType const *m_recon_policy;
double m_max_delete_prop;
SchedType m_sched;
@@ -387,7 +379,7 @@ private:
size_t m_core_cnt;
std::atomic<int> m_next_core;
std::atomic<size_t> m_epoch_cnt;
-
+
alignas(64) std::atomic<bool> m_reconstruction_scheduled;
std::atomic<epoch_ptr> m_next_epoch;
@@ -397,9 +389,6 @@ private:
std::condition_variable m_epoch_cv;
std::mutex m_epoch_cv_lk;
-
-
-
void enforce_delete_invariant(_Epoch *epoch) {
auto structure = epoch->get_structure();
auto compactions = structure->get_compaction_tasks();
@@ -407,8 +396,8 @@ private:
while (compactions.size() > 0) {
/* schedule a compaction */
- ReconstructionArgs<ShardType, QueryType, L> *args =
- new ReconstructionArgs<ShardType, QueryType, L>();
+ ReconstructionArgs<ShardType, QueryType> *args =
+ new ReconstructionArgs<ShardType, QueryType>();
args->epoch = epoch;
args->merges = compactions;
args->extension = this;
@@ -555,20 +544,13 @@ private:
}
static void reconstruction(void *arguments) {
- auto args = (ReconstructionArgs<ShardType, QueryType, L> *)arguments;
+ auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments;
((DynamicExtension *)args->extension)->SetThreadAffinity();
Structure *vers = args->epoch->get_structure();
- if constexpr (L == LayoutPolicy::BSM) {
- if (args->merges.size() > 0) {
- vers->reconstruction(args->merges[0]);
- }
- } else {
- for (size_t i = 0; i < args->merges.size(); i++) {
- vers->reconstruction(args->merges[i].target,
- args->merges[i].sources[0]);
- }
+ for (size_t i=0; i<args->tasks.size(); i++) {
+ vers->perform_reconstruction(args->tasks[i]);
}
/*
@@ -607,8 +589,7 @@ private:
}
static void async_query(void *arguments) {
- auto *args =
- (QueryArgs<ShardType, QueryType, DynamicExtension> *) arguments;
+ auto *args = (QueryArgs<ShardType, QueryType, DynamicExtension> *)arguments;
auto epoch = args->extension->get_active_epoch();
@@ -636,7 +617,7 @@ private:
query_results[i] = QueryType::local_query_buffer(buffer_query);
} else { /*execute local queries */
query_results[i] = QueryType::local_query(shards[i - 1].second,
- local_queries[i - 1]);
+ local_queries[i - 1]);
}
/* end query early if EARLY_ABORT is set and a result exists */
@@ -673,11 +654,11 @@ private:
void schedule_reconstruction() {
auto epoch = create_new_epoch();
- ReconstructionArgs<ShardType, QueryType, L> *args =
- new ReconstructionArgs<ShardType, QueryType, L>();
+ ReconstructionArgs<ShardType, QueryType> *args =
+ new ReconstructionArgs<ShardType, QueryType>();
args->epoch = epoch;
- args->merges = epoch->get_structure()->get_reconstruction_tasks(
- m_buffer->get_high_watermark());
+ args->tasks = m_recon_policy->get_reconstruction_tasks(
+ epoch, m_buffer->get_high_watermark());
args->extension = this;
args->compaction = false;
/* NOTE: args is deleted by the reconstruction job, so shouldn't be freed
@@ -686,10 +667,8 @@ private:
m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
}
- std::future<QueryResult>
- schedule_query(Parameters &&query_parms) {
- auto args =
- new QueryArgs<ShardType, QueryType, DynamicExtension>();
+ std::future<QueryResult> schedule_query(Parameters &&query_parms) {
+ auto args = new QueryArgs<ShardType, QueryType, DynamicExtension>();
args->extension = this;
args->query_parms = std::move(query_parms);
auto result = args->result_set.get_future();
diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h
new file mode 100644
index 0000000..9138bd1
--- /dev/null
+++ b/include/framework/reconstruction/BSMPolicy.h
@@ -0,0 +1,83 @@
+/*
+ * include/framework/reconstruction/LevelingPolicy.h
+ *
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+#include <cmath>
+
+#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "framework/scheduling/Epoch.h"
+#include "util/types.h"
+
+namespace de {
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
+class BSMPolicy : ReconstructionPolicy<ShardType, QueryType> {
+ typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
+ LevelVector;
+
+public:
+ BSMPolicy(size_t scale_factor, size_t buffer_size)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {}
+
+ ReconstructionVector
+ get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch,
+ size_t incoming_reccnt) override {
+ ReconstructionVector reconstructions;
+ auto levels = epoch->get_structure()->get_level_vector();
+
+ level_index target_level = find_reconstruction_target(levels);
+ assert(target_level != -1);
+ level_index source_level = 0;
+
+ if (target_level == invalid_level_idx) {
+ /* grow */
+ target_level = levels.size();
+ }
+
+ ReconstructionTask task;
+ task.target = target_level;
+ task.type = ReconstructionType::Merge;
+
+ for (level_index i = target_level; i > source_level; i--) {
+ if (i < levels.size()) {
+ task.add_shard({i, all_shards_idx}, levels[i]->get_record_count());
+ }
+ }
+
+ reconstructions.add_reconstruction(task);
+ return reconstructions;
+ }
+
+ ReconstructionTask
+ get_flush_task(Epoch<ShardType, QueryType> *epoch) override {
+ return ReconstructionTask{
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush};
+ }
+
+private:
+ level_index find_reconstruction_target(LevelVector &levels) {
+ level_index target_level = 0;
+
+ for (size_t i = 0; i < (level_index)levels.size(); i++) {
+ if (levels[i].get_record_count() + 1 <= capacity(i)) {
+ target_level = i;
+ break;
+ }
+ }
+
+ return target_level;
+ }
+
+ inline size_t capacity(level_index level) {
+ return m_buffer_size * pow(m_scale_factor, level + 1);
+ }
+
+ size_t m_scale_factor;
+ size_t m_buffer_size;
+};
+} // namespace de
diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h
new file mode 100644
index 0000000..00f2cff
--- /dev/null
+++ b/include/framework/reconstruction/LevelingPolicy.h
@@ -0,0 +1,84 @@
+/*
+ * include/framework/reconstruction/LevelingPolicy.h
+ *
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+#include <cmath>
+
+#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "framework/scheduling/Epoch.h"
+#include "util/types.h"
+
+namespace de {
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
+class LevelingPolicy : ReconstructionPolicy<ShardType, QueryType> {
+ typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
+ LevelVector;
+
+public:
+ LevelingPolicy(size_t scale_factor, size_t buffer_size)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {}
+
+ ReconstructionVector
+ get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch,
+ size_t incoming_reccnt) override {
+ ReconstructionVector reconstructions;
+ auto levels = epoch->get_structure()->get_level_vector();
+
+ level_index target_level = find_reconstruction_target(levels);
+ assert(target_level != -1);
+ level_index source_level = 0;
+
+ if (target_level == invalid_level_idx) {
+ /* grow */
+ target_level = levels.size();
+ }
+
+ for (level_index i = target_level; i > source_level; i--) {
+ size_t target_reccnt =
+ (i < levels.size()) ? levels[i]->get_record_count() : 0;
+ size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt;
+
+ reconstructions.add_reconstruction(i - 1, i, total_reccnt,
+ ReconstructionType::Merge);
+ }
+
+ return reconstructions;
+ }
+
+ ReconstructionTask
+ get_flush_task(Epoch<ShardType, QueryType> *epoch) override {
+ return ReconstructionTask{
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush};
+ }
+
+private:
+ level_index find_reconstruction_target(LevelVector &levels) {
+ level_index target_level = 0;
+ size_t incoming_records = m_buffer_size;
+
+ for (size_t i = 0; i < (level_index)levels.size(); i++) {
+ if (levels[i].get_record_count() + incoming_records < capacity(i)) {
+ target_level = i;
+ break;
+ }
+
+ incoming_records = levels[i].get_record_count();
+ }
+
+ return target_level;
+ }
+
+ inline size_t capacity(level_index level) {
+ return m_buffer_size * pow(m_scale_factor, level + 1);
+ }
+
+ size_t m_scale_factor;
+ size_t m_buffer_size;
+};
+} // namespace de
diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h
new file mode 100644
index 0000000..976091e
--- /dev/null
+++ b/include/framework/reconstruction/ReconstructionPolicy.h
@@ -0,0 +1,30 @@
+/*
+ * include/framework/reconstruction/ReconstructionPolicy.h
+ *
+ * Reconstruction class interface, used to implement custom reconstruction
+ * policies.
+ *
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include "util/types.h"
+#include "framework/structure/ExtensionStructure.h"
+#include "framework/scheduling/Epoch.h"
+
+namespace de {
+template<ShardInterface ShardType, QueryInterface<ShardType> QueryType>
+class ReconstructionPolicy {
+ typedef ExtensionStructure<ShardType, QueryType> StructureType;
+
+public:
+ ReconstructionPolicy() {}
+ virtual ReconstructionVector get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch,
+ size_t incoming_reccnt) = 0;
+ virtual ReconstructionTask get_flush_task(Epoch<ShardType, QueryType> *epoch) = 0;
+ };
+}
diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h
new file mode 100644
index 0000000..120bcb5
--- /dev/null
+++ b/include/framework/reconstruction/TieringPolicy.h
@@ -0,0 +1,79 @@
+/*
+ * include/framework/reconstruction/LevelingPolicy.h
+ *
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+#include <cmath>
+
+#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "framework/scheduling/Epoch.h"
+#include "util/types.h"
+
+namespace de {
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
+class TieringPolicy : ReconstructionPolicy<ShardType, QueryType> {
+ typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector;
+public:
+ TieringPolicy(size_t scale_factor, size_t buffer_size)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {}
+
+ ReconstructionVector
+ get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch,
+ size_t incoming_reccnt) override {
+ ReconstructionVector reconstructions;
+ auto levels = epoch->get_structure()->get_level_vector();
+
+ level_index target_level = find_reconstruction_target(levels);
+ assert(target_level != -1);
+ level_index source_level = 0;
+
+ if (target_level == invalid_level_idx) {
+ /* grow */
+ target_level = levels.size();
+ }
+
+ for (level_index i = target_level; i > source_level; i--) {
+ size_t target_reccnt =
+ (i < levels.size()) ? levels[i]->get_record_count() : 0;
+ size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt;
+
+ reconstructions.add_reconstruction(i - 1, i, total_reccnt,
+ ReconstructionType::Compact);
+ }
+
+ return reconstructions;
+ }
+
+ ReconstructionTask
+ get_flush_task(Epoch<ShardType, QueryType> *epoch) override {
+ return ReconstructionTask{
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush};
+ }
+
+private:
+ level_index find_reconstruction_target(LevelVector &levels) {
+ level_index target_level = 0;
+
+ for (size_t i = 0; i < (level_index) levels.size(); i++) {
+ if (levels[i].get_shard_count() + 1 <= capacity()) {
+ target_level = i;
+ break;
+ }
+ }
+
+ return target_level;
+ }
+
+ inline size_t capacity() {
+ return m_scale_factor;
+ }
+
+ size_t m_scale_factor;
+ size_t m_buffer_size;
+};
+} // namespace de
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index 03675b1..303ab2f 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -8,7 +8,6 @@
*/
#pragma once
-#include <condition_variable>
#include <mutex>
#include "framework/structure/BufferView.h"
@@ -17,13 +16,12 @@
namespace de {
-template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
- LayoutPolicy L>
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
class Epoch {
private:
typedef typename ShardType::RECORD RecordType;
typedef MutableBuffer<RecordType> Buffer;
- typedef ExtensionStructure<ShardType, QueryType, L> Structure;
+ typedef ExtensionStructure<ShardType, QueryType> Structure;
typedef BufferView<RecordType> BufView;
public:
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 34f053a..7242bef 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -19,16 +19,15 @@
#include "framework/scheduling/Epoch.h"
#include "framework/scheduling/statistics.h"
-#include "framework/util/Configuration.h"
+#include "util/types.h"
namespace de {
-template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
- LayoutPolicy L>
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
struct ReconstructionArgs {
typedef typename ShardType::RECORD RecordType;
- Epoch<ShardType, QueryType, L> *epoch;
- ReconstructionVector merges;
+ Epoch<ShardType, QueryType> *epoch;
+ ReconstructionVector tasks;
std::promise<bool> result;
bool compaction;
void *extension;
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index c81ad05..9b7ae87 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -11,40 +11,23 @@
#include <atomic>
#include <cstdio>
+#include <memory>
#include <vector>
#include "framework/structure/BufferView.h"
#include "framework/structure/InternalLevel.h"
-
-#include "framework/util/Configuration.h"
-
-#include "psu-util/timer.h"
#include "util/types.h"
namespace de {
-template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
- LayoutPolicy L = LayoutPolicy::TEIRING>
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
class ExtensionStructure {
typedef typename ShardType::RECORD RecordType;
typedef BufferView<RecordType> BuffView;
-
- typedef struct {
- size_t reccnt;
- size_t reccap;
-
- size_t shardcnt;
- size_t shardcap;
- } level_state;
-
- typedef std::vector<level_state> state_vector;
-
+ typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
+ LevelVector;
public:
- ExtensionStructure(size_t buffer_size, size_t scale_factor,
- double max_delete_prop)
- : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop),
- m_buffer_size(buffer_size) {}
-
+ ExtensionStructure() = default;
~ExtensionStructure() = default;
/*
@@ -62,16 +45,13 @@ public:
* problems under tagging with concurrency. Any deletes in this context will
* need to be forwarded to the appropriate structures manually.
*/
- ExtensionStructure<ShardType, QueryType, L> *copy() {
- auto new_struct = new ExtensionStructure<ShardType, QueryType, L>(
- m_buffer_size, m_scale_factor, m_max_delete_prop);
+ ExtensionStructure<ShardType, QueryType> *copy() const {
+ auto new_struct = new ExtensionStructure<ShardType, QueryType>();
for (size_t i = 0; i < m_levels.size(); i++) {
new_struct->m_levels.push_back(m_levels[i]->clone());
}
new_struct->m_refcnt = 0;
- new_struct->m_current_state = m_current_state;
-
return new_struct;
}
@@ -100,30 +80,10 @@ public:
}
/*
- * Flush a buffer into the extension structure, performing any necessary
- * reconstructions to free up room in L0.
- *
- * FIXME: arguably, this should be a method attached to the buffer that
- * takes a structure as input.
- */
- inline bool flush_buffer(BuffView buffer) {
- state_vector tmp = m_current_state;
-
- if (tmp.size() == 0) {
- grow(tmp);
- }
-
- assert(can_reconstruct_with(0, buffer.get_record_count(), tmp));
- flush_buffer_into_l0(std::move(buffer));
-
- return true;
- }
-
- /*
* Return the total number of records (including tombstones) within all
* of the levels of the structure.
*/
- size_t get_record_count() {
+ size_t get_record_count() const {
size_t cnt = 0;
for (size_t i = 0; i < m_levels.size(); i++) {
@@ -138,7 +98,7 @@ public:
* Return the total number of tombstones contained within all of the
* levels of the structure.
*/
- size_t get_tombstone_count() {
+ size_t get_tombstone_count() const {
size_t cnt = 0;
for (size_t i = 0; i < m_levels.size(); i++) {
@@ -153,13 +113,13 @@ public:
* Return the number of levels within the structure. Note that not
* all of these levels are necessarily populated.
*/
- size_t get_height() { return m_levels.size(); }
+ size_t get_height() const { return m_levels.size(); }
/*
* Return the amount of memory (in bytes) used by the shards within the
* structure for storing the primary data structure and raw data.
*/
- size_t get_memory_usage() {
+ size_t get_memory_usage() const {
size_t cnt = 0;
for (size_t i = 0; i < m_levels.size(); i++) {
if (m_levels[i])
@@ -174,7 +134,7 @@ public:
* structure for storing auxiliary data structures. This total does not
* include memory used for the main data structure, or raw data.
*/
- size_t get_aux_memory_usage() {
+ size_t get_aux_memory_usage() const {
size_t cnt = 0;
for (size_t i = 0; i < m_levels.size(); i++) {
if (m_levels[i]) {
@@ -185,326 +145,74 @@ public:
return cnt;
}
- /*
- * Validate that no level in the structure exceeds its maximum tombstone
- * capacity. This is used to trigger preemptive compactions at the end of
- * the reconstruction process.
- */
- bool validate_tombstone_proportion() {
- long double ts_prop;
- for (size_t i = 0; i < m_levels.size(); i++) {
- if (m_levels[i]) {
- ts_prop = (long double)m_levels[i]->get_tombstone_count() /
- (long double)calc_level_record_capacity(i);
- if (ts_prop > (long double)m_max_delete_prop) {
- return false;
- }
- }
- }
-
- return true;
- }
-
- bool validate_tombstone_proportion(level_index level) {
- long double ts_prop = (long double)m_levels[level]->get_tombstone_count() /
- (long double)calc_level_record_capacity(level);
- return ts_prop <= (long double)m_max_delete_prop;
- }
-
- /*
- * Return a reference to the underlying vector of levels within the
- * structure.
- */
- std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> &
- get_levels() {
- return m_levels;
- }
-
- /*
- * NOTE: This cannot be simulated, because tombstone cancellation is not
- * cheaply predictable. It is possible that the worst case number could
- * be used instead, to allow for prediction, but compaction isn't a
- * major concern outside of sampling; at least for now. So I'm not
- * going to focus too much time on it at the moment.
- */
- ReconstructionVector get_compaction_tasks() {
- ReconstructionVector tasks;
- state_vector scratch_state = m_current_state;
-
- /* if the tombstone/delete invariant is satisfied, no need for compactions
- */
- if (validate_tombstone_proportion()) {
- return tasks;
- }
-
- /* locate the first level to violate the invariant */
- level_index violation_idx = -1;
- for (level_index i = 0; i < m_levels.size(); i++) {
- if (!validate_tombstone_proportion(i)) {
- violation_idx = i;
- break;
- }
- }
-
- assert(violation_idx != -1);
-
- level_index base_level =
- find_reconstruction_target(violation_idx, scratch_state);
- if (base_level == -1) {
- base_level = grow(scratch_state);
- }
-
- for (level_index i = base_level; i > 0; i--) {
- /*
- * The amount of storage required for the reconstruction accounts
- * for the cost of storing the new records, along with the
- * cost of retaining the old records during the process
- * (hence the 2x multiplier).
- *
- * FIXME: currently does not account for the *actual* size
- * of the shards, only the storage for the records
- * themselves.
- */
- size_t reccnt = m_levels[i - 1]->get_record_count();
- if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, reccnt, scratch_state)) {
- reccnt += m_levels[i]->get_record_count();
+ inline void perform_reconstruction(ReconstructionTask task) {
+ /* perform the reconstruction itself */
+ std::vector<ShardType *> shards;
+ for (ShardID shid : task.sources) {
+ assert(shid.level_idx < m_levels.size());
+ assert(shid.shard_idx >= -1);
+
+ /* if unspecified, push all shards into the vector */
+ if (shid.shard_idx == all_shards_idx) {
+ for (size_t i = 0; i < m_levels[shid.level_idx].get_shard_count();
+ i++) {
+ if (m_levels[shid.level_idx]->get_shard(i)) {
+ shards.push_back(m_levels[shid.level_idx]->get_shard(i));
+ }
}
+ } else {
+ shards.push_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx));
}
- tasks.add_reconstruction(i - i, i, reccnt);
}
- return tasks;
- }
+ auto new_shard = Shard(shards);
- /*
- *
- */
- ReconstructionVector
- get_reconstruction_tasks(size_t buffer_reccnt,
- state_vector scratch_state = {}) {
/*
- * If no scratch state vector is provided, use a copy of the
- * current one. The only time an empty vector could be used as
- * *real* input to this function is when the current state is also
- * empty, so this should would even in that case.
+ * Remove all of the shards processed by the operation
*/
- if (scratch_state.size() == 0) {
- scratch_state = m_current_state;
- }
-
- ReconstructionVector reconstructions;
- size_t LOOKAHEAD = 1;
- for (size_t i = 0; i < LOOKAHEAD; i++) {
- /*
- * If L0 cannot support a direct buffer flush, figure out what
- * work must be done to free up space first. Otherwise, the
- * reconstruction vector will be initially empty.
- */
- if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) {
- auto local_recon =
- get_reconstruction_tasks_from_level(0, scratch_state);
-
- /*
- * for the first iteration, we need to do all of the
- * reconstructions, so use these to initially the returned
- * reconstruction list
- */
- if (i == 0) {
- reconstructions = local_recon;
- /*
- * Quick sanity test of idea: if the next reconstruction
- * would be larger than this one, steal the largest
- * task from it and run it now instead.
- */
- } else if (local_recon.get_total_reccnt() >
- reconstructions.get_total_reccnt()) {
- auto t = local_recon.remove_reconstruction(0);
- reconstructions.add_reconstruction(t);
- }
- }
-
- /* simulate the buffer flush in the scratch state */
- scratch_state[0].reccnt += buffer_reccnt;
- if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) {
- scratch_state[0].shardcnt += 1;
+ for (ShardID shid : task.sources) {
+ if (shid.shard_idx == all_shards_idx) {
+ m_levels[shid.level_idx]->truncate();
+ } else {
+ m_levels[shid.level_idx]->delete_shard(shid.shard_idx);
}
}
- return reconstructions;
- }
-
- /*
- *
- */
- ReconstructionVector
- get_reconstruction_tasks_from_level(level_index source_level,
- state_vector &scratch_state) {
- ReconstructionVector reconstructions;
-
/*
- * Find the first level capable of sustaining a reconstruction from
- * the level above it. If no such level exists, add a new one at
- * the bottom of the structure.
+ * Append the new shard to the target level
*/
- level_index base_level =
- find_reconstruction_target(source_level, scratch_state);
- if (base_level == -1) {
- base_level = grow(scratch_state);
- }
-
- if constexpr (L == LayoutPolicy::BSM) {
- if (base_level == 0) {
- return reconstructions;
- }
-
- ReconstructionTask task;
- task.target = base_level;
-
- size_t base_reccnt = 0;
- for (level_index i = base_level; i > source_level; i--) {
- auto recon_reccnt = scratch_state[i - 1].reccnt;
- base_reccnt += recon_reccnt;
- scratch_state[i - 1].reccnt = 0;
- scratch_state[i - 1].shardcnt = 0;
- task.add_source(i - 1, recon_reccnt);
- }
-
- reconstructions.add_reconstruction(task);
- scratch_state[base_level].reccnt = base_reccnt;
- scratch_state[base_level].shardcnt = 1;
-
- return reconstructions;
+ if (task.target < m_levels.size()) {
+ m_levels[task.target]->append_shard(new_shard);
+ } else {
+ m_levels.push_back();
}
+ }
+ inline void perform_flush(ReconstructionTask task, BuffView buffer) {
/*
- * Determine the full set of reconstructions necessary to open up
- * space in the source level.
+ * FIXME: this might be faster with a custom interface for merging
+ * the buffer and a vector of shards, but that would also complicate
+ * the shard interface a lot, so we'll leave it like this for now. It
+ * does mean that policies that merge the buffer into L0 double-process
+ * the buffer itself. Given that we're unlikely to actually use policies
+ * like that, we'll leave this as low priority.
*/
- for (level_index i = base_level; i > source_level; i--) {
- size_t recon_reccnt = scratch_state[i - 1].reccnt;
- size_t base_reccnt = recon_reccnt;
-
- /*
- * If using Leveling, the total reconstruction size will be the
- * records in *both* base and target, because they will need to
- * be merged (assuming that target isn't empty).
- */
- if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, base_reccnt, scratch_state)) {
- recon_reccnt += scratch_state[i].reccnt;
- }
- }
- reconstructions.add_reconstruction(i - 1, i, recon_reccnt);
-
- /*
- * The base level will be emptied and its records moved to
- * the target.
- */
- scratch_state[i - 1].reccnt = 0;
- scratch_state[i - 1].shardcnt = 0;
-
- /*
- * The target level will have the records from the base level
- * added to it, and potentially gain a shard if the LayoutPolicy
- * is tiering or the level currently lacks any shards at all.
- */
- scratch_state[i].reccnt += base_reccnt;
- if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) {
- scratch_state[i].shardcnt += 1;
- }
- }
-
- return reconstructions;
- }
-
- inline void reconstruction(ReconstructionTask task) {
- static_assert(L == LayoutPolicy::BSM);
- std::vector<InternalLevel<ShardType, QueryType> *> levels(
- task.sources.size());
- for (size_t i = 0; i < task.sources.size(); i++) {
- levels[i] = m_levels[task.sources[i]].get();
- }
-
- auto new_level = InternalLevel<ShardType, QueryType>::reconstruction(
- levels, task.target);
- if (task.target >= (level_index) m_levels.size()) {
- m_current_state.push_back({new_level->get_record_count(),
- calc_level_record_capacity(task.target), 1,
- 1});
- m_levels.emplace_back(new_level);
+ ShardType *buffer_shard = new ShardType(buffer);
+ if (task.type == ReconstructionType::Append) {
+ m_levels[0]->append(std::shared_ptr(buffer_shard));
} else {
- m_current_state[task.target] = {new_level->get_record_count(),
- calc_level_record_capacity(task.target),
- 1, 1};
- m_levels[task.target] = new_level;
- }
-
- /* remove all of the levels that have been flattened */
- for (size_t i = 0; i < task.sources.size(); i++) {
- m_levels[task.sources[i]] =
- std::shared_ptr<InternalLevel<ShardType, QueryType>>(
- new InternalLevel<ShardType, QueryType>(task.sources[i], 1));
- m_current_state[task.sources[i]] = {
- 0, calc_level_record_capacity(task.target), 0, 1};
- }
-
- return;
- }
-
- /*
- * Combine incoming_level with base_level and reconstruct the shard,
- * placing it in base_level. The two levels should be sequential--i.e. no
- * levels are skipped in the reconstruction process--otherwise the
- * tombstone ordering invariant may be violated.
- */
- inline void reconstruction(level_index base_level,
- level_index incoming_level) {
- size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
-
- if (base_level >= (level_index) m_levels.size()) {
- m_levels.emplace_back(
- std::shared_ptr<InternalLevel<ShardType, QueryType>>(
- new InternalLevel<ShardType, QueryType>(base_level,
- shard_capacity)));
- m_current_state.push_back(
- {0, calc_level_record_capacity(base_level), 0, shard_capacity});
- }
+ std::vector<ShardType *> shards;
+ for (size_t i = 0; i < m_levels[0].size(); i++) {
+ if (m_levels[0]->get_shard(i)) {
+ shards.push_back(m_levels[0]->get_shard(i));
+ }
- if constexpr (L == LayoutPolicy::LEVELING) {
- /* if the base level has a shard, merge the base and incoming together to
- * make a new one */
- if (m_levels[base_level]->get_shard_count() > 0) {
- m_levels[base_level] =
- InternalLevel<ShardType, QueryType>::reconstruction(
- m_levels[base_level].get(), m_levels[incoming_level].get());
- /* otherwise, we can just move the incoming to the base */
- } else {
- m_levels[base_level] = m_levels[incoming_level];
+ shards.push_back(buffer_shard);
+ ShardType *new_shard = new ShardType(shards);
+ m_levels[0]->truncate();
+ m_levels[0]->append(std::shared_ptr(new_shard));
}
-
- } else {
- m_levels[base_level]->append_level(m_levels[incoming_level].get());
- m_levels[base_level]->finalize();
}
-
- /* place a new, empty level where the incoming level used to be */
- m_levels[incoming_level] =
- std::shared_ptr<InternalLevel<ShardType, QueryType>>(
- new InternalLevel<ShardType, QueryType>(
- incoming_level,
- (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
-
- /*
- * Update the state vector to match the *real* state following
- * the reconstruction
- */
- m_current_state[base_level] = {m_levels[base_level]->get_record_count(),
- calc_level_record_capacity(base_level),
- m_levels[base_level]->get_shard_count(),
- shard_capacity};
- m_current_state[incoming_level] = {
- 0, calc_level_record_capacity(incoming_level), 0, shard_capacity};
}
bool take_reference() {
@@ -518,11 +226,11 @@ public:
return true;
}
- size_t get_reference_count() { return m_refcnt.load(); }
+ size_t get_reference_count() const { return m_refcnt.load(); }
std::vector<typename QueryType::LocalQuery *>
get_local_queries(std::vector<std::pair<ShardID, ShardType *>> &shards,
- typename QueryType::Parameters *parms) {
+ typename QueryType::Parameters *parms) const {
std::vector<typename QueryType::LocalQuery *> queries;
@@ -533,150 +241,11 @@ public:
return queries;
}
-private:
- size_t m_scale_factor;
- double m_max_delete_prop;
- size_t m_buffer_size;
+ LevelVector const &get_level_vector() const { return m_levels; }
+private:
std::atomic<size_t> m_refcnt;
-
- std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> m_levels;
-
- /*
- * A pair of <record_count, shard_count> for each level in the
- * structure. Record counts may be slightly inaccurate due to
- * deletes.
- */
- state_vector m_current_state;
-
- /*
- * Add a new level to the scratch state and return its index.
- *
- * IMPORTANT: This does _not_ add a level to the extension structure
- * anymore. This is handled by the appropriate reconstruction and flush
- * methods as needed. This function is for use in "simulated"
- * reconstructions.
- */
- inline level_index grow(state_vector &scratch_state) {
- level_index new_idx = m_levels.size();
- size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
-
- scratch_state.push_back(
- {0, calc_level_record_capacity(new_idx), 0, new_shard_cap});
- return new_idx;
- }
-
- /*
- * Find the first level below the level indicated by idx that
- * is capable of sustaining a reconstruction and return its
- * level index. If no such level exists, returns -1. Also
- * returns -1 if idx==0, and no such level exists, to simplify
- * the logic of the first buffer flush.
- */
- inline level_index find_reconstruction_target(level_index idx,
- state_vector &state) {
-
- /*
- * this handles the very first buffer flush, when the state vector
- * is empty.
- */
- if (idx == 0 && state.size() == 0)
- return -1;
-
- size_t incoming_rec_cnt = state[idx].reccnt;
- for (level_index i = idx + 1; i < (level_index) state.size(); i++) {
- if (can_reconstruct_with(i, incoming_rec_cnt, state)) {
- return i;
- }
-
- incoming_rec_cnt = state[idx].reccnt;
- }
-
- return -1;
- }
-
- inline void flush_buffer_into_l0(BuffView buffer) {
- size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
-
- if (m_levels.size() == 0) {
- m_levels.emplace_back(
- std::shared_ptr<InternalLevel<ShardType, QueryType>>(
- new InternalLevel<ShardType, QueryType>(0, shard_capacity)));
-
- m_current_state.push_back(
- {0, calc_level_record_capacity(0), 0, shard_capacity});
- }
-
- if constexpr (L == LayoutPolicy::LEVELING) {
- // FIXME: Kludgey implementation due to interface constraints.
- auto old_level = m_levels[0].get();
- auto temp_level = new InternalLevel<ShardType, QueryType>(0, 1);
- temp_level->append_buffer(std::move(buffer));
-
- if (old_level->get_shard_count() > 0) {
- m_levels[0] = InternalLevel<ShardType, QueryType>::reconstruction(
- old_level, temp_level);
- delete temp_level;
- } else {
- m_levels[0] =
- std::shared_ptr<InternalLevel<ShardType, QueryType>>(temp_level);
- }
- } else {
- m_levels[0]->append_buffer(std::move(buffer));
- }
-
- /* update the state vector */
- m_current_state[0].reccnt = m_levels[0]->get_record_count();
- m_current_state[0].shardcnt = m_levels[0]->get_shard_count();
- }
-
- /*
- * Mark a given memory level as no-longer in use by the tree. For now this
- * will just free the level. In future, this will be more complex as the
- * level may not be able to immediately be deleted, depending upon who
- * else is using it.
- */
- inline void
- mark_as_unused(std::shared_ptr<InternalLevel<ShardType, QueryType>> level) {
- level.reset();
- }
-
- /*
- * Assume that level "0" should be larger than the buffer. The buffer
- * itself is index -1, which should return simply the buffer capacity.
- */
- inline size_t calc_level_record_capacity(level_index idx) {
- return m_buffer_size * pow(m_scale_factor, idx + 1);
- }
-
- /*
- * Returns the number of records present on a specified level.
- */
- inline size_t get_level_record_count(level_index idx) {
- return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0;
- }
-
- /*
- * Determines if a level can sustain a reconstruction with incoming_rec_cnt
- * additional records without exceeding its capacity.
- */
- inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt,
- state_vector &state) {
- if (idx >= (level_index) state.size()) {
- return false;
- }
-
- if constexpr (L == LayoutPolicy::LEVELING) {
- return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap;
- } else if constexpr (L == LayoutPolicy::BSM) {
- return state[idx].reccnt == 0;
- } else {
- return state[idx].shardcnt < state[idx].shardcap;
- }
-
- /* unreachable */
- assert(true);
- }
+ LevelVector m_levels;
};
} // namespace de
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index a4cf94d..8cfcd49 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -19,14 +19,10 @@
#include <vector>
#include "framework/interface/Query.h"
-#include "framework/interface/Record.h"
#include "framework/interface/Shard.h"
-#include "framework/structure/BufferView.h"
#include "util/types.h"
namespace de {
-template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
-class InternalLevel;
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
class InternalLevel {
@@ -34,110 +30,9 @@ class InternalLevel {
typedef BufferView<RecordType> BuffView;
public:
- InternalLevel(ssize_t level_no, size_t shard_cap)
- : m_level_no(level_no), m_shard_cnt(0), m_shards(shard_cap, nullptr),
- m_pending_shard(nullptr) {}
-
- ~InternalLevel() { delete m_pending_shard; }
-
- /*
- * Create a new shard combining the records from base_level and new_level,
- * and return a shared_ptr to a new level containing this shard. This is used
- * for reconstructions under the leveling layout policy.
- *
- * No changes are made to the levels provided as arguments.
- */
- static std::shared_ptr<InternalLevel>
- reconstruction(InternalLevel *base_level, InternalLevel *new_level) {
- assert(base_level->m_level_no > new_level->m_level_no ||
- (base_level->m_level_no == 0 && new_level->m_level_no == 0));
- auto res = new InternalLevel(base_level->m_level_no, 1);
- res->m_shard_cnt = 1;
- std::vector<ShardType *> shards = {base_level->m_shards[0].get(),
- new_level->m_shards[0].get()};
-
- res->m_shards[0] = std::make_shared<ShardType>(shards);
- return std::shared_ptr<InternalLevel>(res);
- }
-
- static std::shared_ptr<InternalLevel>
- reconstruction(std::vector<InternalLevel *> levels, size_t level_idx) {
- std::vector<ShardType *> shards;
- for (auto level : levels) {
- for (auto shard : level->m_shards) {
- if (shard)
- shards.emplace_back(shard.get());
- }
- }
-
- auto res = new InternalLevel(level_idx, 1);
- res->m_shard_cnt = 1;
- res->m_shards[0] = std::make_shared<ShardType>(shards);
+ InternalLevel(ssize_t level_no) : m_level_no(level_no) {}
- return std::shared_ptr<InternalLevel>(res);
- }
-
- /*
- * Create a new shard combining the records from all of
- * the shards in level, and append this new shard into
- * this level. This is used for reconstructions under
- * the tiering layout policy.
- *
- * No changes are made to the level provided as an argument.
- */
- void append_level(InternalLevel *level) {
- // FIXME: that this is happening probably means that
- // something is going terribly wrong earlier in the
- // reconstruction logic.
- if (level->get_shard_count() == 0) {
- return;
- }
-
- std::vector<ShardType *> shards;
- for (auto shard : level->m_shards) {
- if (shard)
- shards.emplace_back(shard.get());
- }
-
- if (m_shard_cnt == m_shards.size()) {
- m_pending_shard = new ShardType(shards);
- return;
- }
-
- auto tmp = new ShardType(shards);
- m_shards[m_shard_cnt] = std::shared_ptr<ShardType>(tmp);
-
- ++m_shard_cnt;
- }
-
- /*
- * Create a new shard using the records in the
- * provided buffer, and append this new shard
- * into this level. This is used for buffer
- * flushes under the tiering layout policy.
- */
- void append_buffer(BuffView buffer) {
- if (m_shard_cnt == m_shards.size()) {
- assert(m_pending_shard == nullptr);
- m_pending_shard = new ShardType(std::move(buffer));
- return;
- }
-
- m_shards[m_shard_cnt] = std::make_shared<ShardType>(std::move(buffer));
- ++m_shard_cnt;
- }
-
- void finalize() {
- if (m_pending_shard) {
- for (size_t i = 0; i < m_shards.size(); i++) {
- m_shards[i] = nullptr;
- }
-
- m_shards[0] = std::shared_ptr<ShardType>(m_pending_shard);
- m_pending_shard = nullptr;
- m_shard_cnt = 1;
- }
- }
+ ~InternalLevel() = default;
/*
* Create a new shard containing the combined records
@@ -146,7 +41,7 @@ public:
* No changes are made to this level.
*/
ShardType *get_combined_shard() {
- if (m_shard_cnt == 0) {
+ if (m_shards.size() == 0) {
return nullptr;
}
@@ -163,7 +58,7 @@ public:
std::vector<std::pair<ShardID, ShardType *>> &shards,
std::vector<typename QueryType::LocalQuery *> &local_queries,
typename QueryType::Parameters *query_parms) {
- for (size_t i = 0; i < m_shard_cnt; i++) {
+ for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
auto local_query =
QueryType::local_preproc(m_shards[i].get(), query_parms);
@@ -174,10 +69,10 @@ public:
}
bool check_tombstone(size_t shard_stop, const RecordType &rec) {
- if (m_shard_cnt == 0)
+ if (m_shards.size() == 0)
return false;
- for (int i = m_shard_cnt - 1; i >= (ssize_t)shard_stop; i--) {
+ for (int i = m_shards.size() - 1; i >= (ssize_t)shard_stop; i--) {
if (m_shards[i]) {
auto res = m_shards[i]->point_lookup(rec, true);
if (res && res->is_tombstone()) {
@@ -189,7 +84,7 @@ public:
}
bool delete_record(const RecordType &rec) {
- if (m_shard_cnt == 0)
+ if (m_shards.size() == 0)
return false;
for (size_t i = 0; i < m_shards.size(); ++i) {
@@ -206,18 +101,18 @@ public:
}
ShardType *get_shard(size_t idx) {
- if (idx >= m_shard_cnt) {
+ if (idx >= m_shards.size()) {
return nullptr;
}
return m_shards[idx].get();
}
- size_t get_shard_count() { return m_shard_cnt; }
+ size_t get_shard_count() { return m_shards.size(); }
size_t get_record_count() {
size_t cnt = 0;
- for (size_t i = 0; i < m_shard_cnt; i++) {
+ for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
cnt += m_shards[i]->get_record_count();
}
@@ -228,7 +123,7 @@ public:
size_t get_tombstone_count() {
size_t res = 0;
- for (size_t i = 0; i < m_shard_cnt; ++i) {
+ for (size_t i = 0; i < m_shards.size(); ++i) {
if (m_shards[i]) {
res += m_shards[i]->get_tombstone_count();
}
@@ -238,7 +133,7 @@ public:
size_t get_aux_memory_usage() {
size_t cnt = 0;
- for (size_t i = 0; i < m_shard_cnt; i++) {
+ for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
cnt += m_shards[i]->get_aux_memory_usage();
}
@@ -249,7 +144,7 @@ public:
size_t get_memory_usage() {
size_t cnt = 0;
- for (size_t i = 0; i < m_shard_cnt; i++) {
+ for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
cnt += m_shards[i]->get_memory_usage();
}
@@ -261,7 +156,7 @@ public:
double get_tombstone_prop() {
size_t tscnt = 0;
size_t reccnt = 0;
- for (size_t i = 0; i < m_shard_cnt; i++) {
+ for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
tscnt += m_shards[i]->get_tombstone_count();
reccnt += m_shards[i]->get_record_count();
@@ -272,24 +167,27 @@ public:
}
std::shared_ptr<InternalLevel> clone() {
- auto new_level =
- std::make_shared<InternalLevel>(m_level_no, m_shards.size());
- for (size_t i = 0; i < m_shard_cnt; i++) {
+ auto new_level = std::make_shared<InternalLevel>(m_level_no);
+ for (size_t i = 0; i < m_shards.size(); i++) {
new_level->m_shards[i] = m_shards[i];
}
- new_level->m_shard_cnt = m_shard_cnt;
return new_level;
}
-private:
- ssize_t m_level_no;
+ void truncate() { m_shards.erase(m_shards.begin(), m_shards.end()); }
- size_t m_shard_cnt;
- size_t m_shard_size_cap;
+ void delete_shard(shard_index shard) {
+ m_shards.erase(m_shards.begin() + shard);
+ }
+
+ bool append(std::shared_ptr<ShardType> shard) {
+ m_shards.emplace_back(shard);
+ }
+private:
+ ssize_t m_level_no;
std::vector<std::shared_ptr<ShardType>> m_shards;
- ShardType *m_pending_shard;
};
} // namespace de
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index f4b0364..0477095 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -8,13 +8,21 @@
*/
#pragma once
+#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "util/types.h"
+#include "framework/interface/Scheduler.h"
#include <cstdlib>
-#include <utility>
namespace de {
-enum class LayoutPolicy { LEVELING, TEIRING, BSM };
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
+DeletePolicy D, SchedulerInterface SchedType>
+class DEConfiguration {
+ DEConfiguration(std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy)
+ : m_recon_policy(recon_policy) {}
-enum class DeletePolicy { TOMBSTONE, TAGGING };
+ public:
+ std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> m_recon_policy;
+};
} // namespace de