diff options
| author | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-12-22 13:00:19 -0500 |
|---|---|---|
| committer | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-12-22 13:00:19 -0500 |
| commit | 5617bed5257506d3dfda8537b16f44b3e40f1b42 (patch) | |
| tree | b1a4bb957929b20c884a4eed070f42065828fdb6 /include/framework | |
| parent | 9876d74e503df64eb9e82e540ca41fcf593ebf64 (diff) | |
| download | dynamic-extension-5617bed5257506d3dfda8537b16f44b3e40f1b42.tar.gz | |
Began overhauling reconstruction mechanism
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 121 | ||||
| -rw-r--r-- | include/framework/reconstruction/BSMPolicy.h | 83 | ||||
| -rw-r--r-- | include/framework/reconstruction/LevelingPolicy.h | 84 | ||||
| -rw-r--r-- | include/framework/reconstruction/ReconstructionPolicy.h | 30 | ||||
| -rw-r--r-- | include/framework/reconstruction/TieringPolicy.h | 79 | ||||
| -rw-r--r-- | include/framework/scheduling/Epoch.h | 6 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 9 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 561 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 154 | ||||
| -rw-r--r-- | include/framework/util/Configuration.h | 14 |
10 files changed, 434 insertions, 707 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 719232e..1886234 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -14,39 +14,36 @@ #include <vector> #include "framework/interface/Scheduler.h" +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/scheduling/FIFOScheduler.h" #include "framework/scheduling/SerialScheduler.h" #include "framework/structure/ExtensionStructure.h" #include "framework/structure/MutableBuffer.h" #include "framework/scheduling/Epoch.h" -#include "framework/util/Configuration.h" +#include "util/types.h" namespace de { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType, - LayoutPolicy L = LayoutPolicy::TEIRING, DeletePolicy D = DeletePolicy::TAGGING, - SchedulerInterface SchedType = SerialScheduler> + SchedulerInterface SchedType = de::FIFOScheduler> class DynamicExtension { - /* for unit testing purposes */ -public: - LayoutPolicy Layout = L; - private: /* convenience typedefs for commonly used types within the class */ typedef typename ShardType::RECORD RecordType; typedef MutableBuffer<RecordType> Buffer; - typedef ExtensionStructure<ShardType, QueryType, L> Structure; - typedef Epoch<ShardType, QueryType, L> _Epoch; + typedef ExtensionStructure<ShardType, QueryType> Structure; + typedef Epoch<ShardType, QueryType> _Epoch; typedef BufferView<RecordType> BufView; + typedef ReconstructionPolicy<ShardType, QueryType> ReconPolicyType; typedef typename QueryType::Parameters Parameters; typedef typename QueryType::LocalQuery LocalQuery; typedef typename QueryType::LocalQueryBuffer BufferQuery; typedef typename QueryType::LocalResultType LocalResult; typedef typename QueryType::ResultType QueryResult; - static constexpr size_t QUERY = 1; static constexpr size_t RECONSTRUCTION = 2; @@ -61,36 +58,32 @@ public: * Create a new Dynamized version of a data structure, supporting * inserts and, possibly, deletes. The following parameters are used * for configuration of the structure, - * @param buffer_low_watermark The number of records that can be + * @param buffer_low_watermark The number of records that can be * inserted before a buffer flush is initiated * - * @param buffer_high_watermark The maximum buffer capacity, inserts - * will begin to fail once this number is reached, until the - * buffer flush has completed. Has no effect in single-threaded + * @param buffer_high_watermark The maximum buffer capacity, inserts + * will begin to fail once this number is reached, until the + * buffer flush has completed. Has no effect in single-threaded * operation * - * @param scale_factor The rate at which the capacity of levels + * @param scale_factor The rate at which the capacity of levels * grows; should be at least 2 for reasonable performance * * @param memory_budget Unused at this time * * @param thread_cnt The maximum number of threads available to the - * framework's scheduler for use in answering queries and + * framework's scheduler for use in answering queries and * performing compactions and flushes, etc. */ - DynamicExtension(size_t buffer_low_watermark, size_t buffer_high_watermark, - size_t scale_factor, size_t memory_budget = 0, + DynamicExtension(ReconPolicyType *recon_policy, size_t buffer_low_watermark, + size_t buffer_high_watermark, size_t memory_budget = 0, size_t thread_cnt = 16) - : m_scale_factor(scale_factor), m_max_delete_prop(1), - m_sched(memory_budget, thread_cnt), + : m_max_delete_prop(1), m_sched(memory_budget, thread_cnt), m_buffer(new Buffer(buffer_low_watermark, buffer_high_watermark)), - m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0) { - if constexpr (L == LayoutPolicy::BSM) { - assert(scale_factor == 2); - } + m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0), + m_recon_policy(recon_policy) { - auto vers = - new Structure(buffer_high_watermark, m_scale_factor, m_max_delete_prop); + auto vers = new Structure(); m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); m_previous_epoch.store({nullptr, 0}); m_next_epoch.store({nullptr, 0}); @@ -133,26 +126,26 @@ public: int insert(const RecordType &rec) { return internal_append(rec, false); } /** - * Erases a record from the index, according to the DeletePolicy + * Erases a record from the index, according to the DeletePolicy * template parameter. Returns 1 on success and 0 on failure. The * equality comparison operator of RecordType is used to identify - * the record to be deleted. - * + * the record to be deleted. + * * Deletes behave differently, depending on the DeletionPolicy. For * Tombstone deletes, a tombstone record will be inserted into the * index. The presence of the deleted record is not checked first, so * deleting a record that does not exist will result in an unnecessary - * tombstone record being written. + * tombstone record being written. * * Deletes using Tagging will perform a point lookup for the record to - * be removed, and mark it as deleted in its header. + * be removed, and mark it as deleted in its header. * - * @param rec The record to be deleted. The argument to this function + * @param rec The record to be deleted. The argument to this function * should compare equal to the record to be deleted. * - * @return 1 on success, and 0 on failure. For tombstone deletes, a - * failure will occur if the insert fails due to the buffer - * being full, and can be retried. For tagging deletes, a + * @return 1 on success, and 0 on failure. For tombstone deletes, a + * failure will occur if the insert fails due to the buffer + * being full, and can be retried. For tagging deletes, a * failure means that hte record to be deleted could not be * found in the index, and should *not* be retried. */ @@ -202,13 +195,12 @@ public: * @return A future, from which the query results can be retrieved upon * query completion */ - std::future<QueryResult> - query(Parameters &&parms) { + std::future<QueryResult> query(Parameters &&parms) { return schedule_query(std::move(parms)); } /** - * Determine the number of records (including tagged records and + * Determine the number of records (including tagged records and * tombstones) currently within the framework. This number is used for * determining when and how reconstructions occur. * @@ -229,7 +221,7 @@ public: * but will always return 0 in that case. * * @return The number of tombstone records within the index - */ + */ size_t get_tombstone_count() { auto epoch = get_active_epoch(); auto t = epoch->get_buffer().get_tombstone_count() + @@ -246,7 +238,7 @@ public: * as each level can contain multiple shards in that case. * * @return The number of levels within the index - */ + */ size_t get_height() { auto epoch = get_active_epoch(); auto t = epoch->get_structure()->get_height(); @@ -292,7 +284,7 @@ public: /** * Create a new single Shard object containing all of the records - * within the framework (buffer and shards). + * within the framework (buffer and shards). * * @param await_reconstruction_completion Specifies whether the currently * active state of the index should be used to create the shard @@ -357,11 +349,11 @@ public: } /** - * Verify that the currently active version of the index does not + * Verify that the currently active version of the index does not * violate tombstone proportion invariants. Exposed for unit-testing * purposes. * - * @return Returns true if the tombstone proportion invariant is + * @return Returns true if the tombstone proportion invariant is * satisfied, and false if it is not. */ bool validate_tombstone_proportion() { @@ -378,7 +370,7 @@ public: void print_scheduler_statistics() const { m_sched.print_statistics(); } private: - size_t m_scale_factor; + ReconPolicyType const *m_recon_policy; double m_max_delete_prop; SchedType m_sched; @@ -387,7 +379,7 @@ private: size_t m_core_cnt; std::atomic<int> m_next_core; std::atomic<size_t> m_epoch_cnt; - + alignas(64) std::atomic<bool> m_reconstruction_scheduled; std::atomic<epoch_ptr> m_next_epoch; @@ -397,9 +389,6 @@ private: std::condition_variable m_epoch_cv; std::mutex m_epoch_cv_lk; - - - void enforce_delete_invariant(_Epoch *epoch) { auto structure = epoch->get_structure(); auto compactions = structure->get_compaction_tasks(); @@ -407,8 +396,8 @@ private: while (compactions.size() > 0) { /* schedule a compaction */ - ReconstructionArgs<ShardType, QueryType, L> *args = - new ReconstructionArgs<ShardType, QueryType, L>(); + ReconstructionArgs<ShardType, QueryType> *args = + new ReconstructionArgs<ShardType, QueryType>(); args->epoch = epoch; args->merges = compactions; args->extension = this; @@ -555,20 +544,13 @@ private: } static void reconstruction(void *arguments) { - auto args = (ReconstructionArgs<ShardType, QueryType, L> *)arguments; + auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments; ((DynamicExtension *)args->extension)->SetThreadAffinity(); Structure *vers = args->epoch->get_structure(); - if constexpr (L == LayoutPolicy::BSM) { - if (args->merges.size() > 0) { - vers->reconstruction(args->merges[0]); - } - } else { - for (size_t i = 0; i < args->merges.size(); i++) { - vers->reconstruction(args->merges[i].target, - args->merges[i].sources[0]); - } + for (size_t i=0; i<args->tasks.size(); i++) { + vers->perform_reconstruction(args->tasks[i]); } /* @@ -607,8 +589,7 @@ private: } static void async_query(void *arguments) { - auto *args = - (QueryArgs<ShardType, QueryType, DynamicExtension> *) arguments; + auto *args = (QueryArgs<ShardType, QueryType, DynamicExtension> *)arguments; auto epoch = args->extension->get_active_epoch(); @@ -636,7 +617,7 @@ private: query_results[i] = QueryType::local_query_buffer(buffer_query); } else { /*execute local queries */ query_results[i] = QueryType::local_query(shards[i - 1].second, - local_queries[i - 1]); + local_queries[i - 1]); } /* end query early if EARLY_ABORT is set and a result exists */ @@ -673,11 +654,11 @@ private: void schedule_reconstruction() { auto epoch = create_new_epoch(); - ReconstructionArgs<ShardType, QueryType, L> *args = - new ReconstructionArgs<ShardType, QueryType, L>(); + ReconstructionArgs<ShardType, QueryType> *args = + new ReconstructionArgs<ShardType, QueryType>(); args->epoch = epoch; - args->merges = epoch->get_structure()->get_reconstruction_tasks( - m_buffer->get_high_watermark()); + args->tasks = m_recon_policy->get_reconstruction_tasks( + epoch, m_buffer->get_high_watermark()); args->extension = this; args->compaction = false; /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed @@ -686,10 +667,8 @@ private: m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); } - std::future<QueryResult> - schedule_query(Parameters &&query_parms) { - auto args = - new QueryArgs<ShardType, QueryType, DynamicExtension>(); + std::future<QueryResult> schedule_query(Parameters &&query_parms) { + auto args = new QueryArgs<ShardType, QueryType, DynamicExtension>(); args->extension = this; args->query_parms = std::move(query_parms); auto result = args->result_set.get_future(); diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h new file mode 100644 index 0000000..9138bd1 --- /dev/null +++ b/include/framework/reconstruction/BSMPolicy.h @@ -0,0 +1,83 @@ +/* + * include/framework/reconstruction/LevelingPolicy.h + * + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once +#include <cmath> + +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/scheduling/Epoch.h" +#include "util/types.h" + +namespace de { +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> +class BSMPolicy : ReconstructionPolicy<ShardType, QueryType> { + typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> + LevelVector; + +public: + BSMPolicy(size_t scale_factor, size_t buffer_size) + : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} + + ReconstructionVector + get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch, + size_t incoming_reccnt) override { + ReconstructionVector reconstructions; + auto levels = epoch->get_structure()->get_level_vector(); + + level_index target_level = find_reconstruction_target(levels); + assert(target_level != -1); + level_index source_level = 0; + + if (target_level == invalid_level_idx) { + /* grow */ + target_level = levels.size(); + } + + ReconstructionTask task; + task.target = target_level; + task.type = ReconstructionType::Merge; + + for (level_index i = target_level; i > source_level; i--) { + if (i < levels.size()) { + task.add_shard({i, all_shards_idx}, levels[i]->get_record_count()); + } + } + + reconstructions.add_reconstruction(task); + return reconstructions; + } + + ReconstructionTask + get_flush_task(Epoch<ShardType, QueryType> *epoch) override { + return ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush}; + } + +private: + level_index find_reconstruction_target(LevelVector &levels) { + level_index target_level = 0; + + for (size_t i = 0; i < (level_index)levels.size(); i++) { + if (levels[i].get_record_count() + 1 <= capacity(i)) { + target_level = i; + break; + } + } + + return target_level; + } + + inline size_t capacity(level_index level) { + return m_buffer_size * pow(m_scale_factor, level + 1); + } + + size_t m_scale_factor; + size_t m_buffer_size; +}; +} // namespace de diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h new file mode 100644 index 0000000..00f2cff --- /dev/null +++ b/include/framework/reconstruction/LevelingPolicy.h @@ -0,0 +1,84 @@ +/* + * include/framework/reconstruction/LevelingPolicy.h + * + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once +#include <cmath> + +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/scheduling/Epoch.h" +#include "util/types.h" + +namespace de { +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> +class LevelingPolicy : ReconstructionPolicy<ShardType, QueryType> { + typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> + LevelVector; + +public: + LevelingPolicy(size_t scale_factor, size_t buffer_size) + : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} + + ReconstructionVector + get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch, + size_t incoming_reccnt) override { + ReconstructionVector reconstructions; + auto levels = epoch->get_structure()->get_level_vector(); + + level_index target_level = find_reconstruction_target(levels); + assert(target_level != -1); + level_index source_level = 0; + + if (target_level == invalid_level_idx) { + /* grow */ + target_level = levels.size(); + } + + for (level_index i = target_level; i > source_level; i--) { + size_t target_reccnt = + (i < levels.size()) ? levels[i]->get_record_count() : 0; + size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt; + + reconstructions.add_reconstruction(i - 1, i, total_reccnt, + ReconstructionType::Merge); + } + + return reconstructions; + } + + ReconstructionTask + get_flush_task(Epoch<ShardType, QueryType> *epoch) override { + return ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush}; + } + +private: + level_index find_reconstruction_target(LevelVector &levels) { + level_index target_level = 0; + size_t incoming_records = m_buffer_size; + + for (size_t i = 0; i < (level_index)levels.size(); i++) { + if (levels[i].get_record_count() + incoming_records < capacity(i)) { + target_level = i; + break; + } + + incoming_records = levels[i].get_record_count(); + } + + return target_level; + } + + inline size_t capacity(level_index level) { + return m_buffer_size * pow(m_scale_factor, level + 1); + } + + size_t m_scale_factor; + size_t m_buffer_size; +}; +} // namespace de diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h new file mode 100644 index 0000000..976091e --- /dev/null +++ b/include/framework/reconstruction/ReconstructionPolicy.h @@ -0,0 +1,30 @@ +/* + * include/framework/reconstruction/ReconstructionPolicy.h + * + * Reconstruction class interface, used to implement custom reconstruction + * policies. + * + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once + +#include "util/types.h" +#include "framework/structure/ExtensionStructure.h" +#include "framework/scheduling/Epoch.h" + +namespace de { +template<ShardInterface ShardType, QueryInterface<ShardType> QueryType> +class ReconstructionPolicy { + typedef ExtensionStructure<ShardType, QueryType> StructureType; + +public: + ReconstructionPolicy() {} + virtual ReconstructionVector get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch, + size_t incoming_reccnt) = 0; + virtual ReconstructionTask get_flush_task(Epoch<ShardType, QueryType> *epoch) = 0; + }; +} diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h new file mode 100644 index 0000000..120bcb5 --- /dev/null +++ b/include/framework/reconstruction/TieringPolicy.h @@ -0,0 +1,79 @@ +/* + * include/framework/reconstruction/LevelingPolicy.h + * + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once +#include <cmath> + +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/scheduling/Epoch.h" +#include "util/types.h" + +namespace de { +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> +class TieringPolicy : ReconstructionPolicy<ShardType, QueryType> { + typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector; +public: + TieringPolicy(size_t scale_factor, size_t buffer_size) + : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {} + + ReconstructionVector + get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch, + size_t incoming_reccnt) override { + ReconstructionVector reconstructions; + auto levels = epoch->get_structure()->get_level_vector(); + + level_index target_level = find_reconstruction_target(levels); + assert(target_level != -1); + level_index source_level = 0; + + if (target_level == invalid_level_idx) { + /* grow */ + target_level = levels.size(); + } + + for (level_index i = target_level; i > source_level; i--) { + size_t target_reccnt = + (i < levels.size()) ? levels[i]->get_record_count() : 0; + size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt; + + reconstructions.add_reconstruction(i - 1, i, total_reccnt, + ReconstructionType::Compact); + } + + return reconstructions; + } + + ReconstructionTask + get_flush_task(Epoch<ShardType, QueryType> *epoch) override { + return ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush}; + } + +private: + level_index find_reconstruction_target(LevelVector &levels) { + level_index target_level = 0; + + for (size_t i = 0; i < (level_index) levels.size(); i++) { + if (levels[i].get_shard_count() + 1 <= capacity()) { + target_level = i; + break; + } + } + + return target_level; + } + + inline size_t capacity() { + return m_scale_factor; + } + + size_t m_scale_factor; + size_t m_buffer_size; +}; +} // namespace de diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 03675b1..303ab2f 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -8,7 +8,6 @@ */ #pragma once -#include <condition_variable> #include <mutex> #include "framework/structure/BufferView.h" @@ -17,13 +16,12 @@ namespace de { -template <ShardInterface ShardType, QueryInterface<ShardType> QueryType, - LayoutPolicy L> +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> class Epoch { private: typedef typename ShardType::RECORD RecordType; typedef MutableBuffer<RecordType> Buffer; - typedef ExtensionStructure<ShardType, QueryType, L> Structure; + typedef ExtensionStructure<ShardType, QueryType> Structure; typedef BufferView<RecordType> BufView; public: diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 34f053a..7242bef 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -19,16 +19,15 @@ #include "framework/scheduling/Epoch.h" #include "framework/scheduling/statistics.h" -#include "framework/util/Configuration.h" +#include "util/types.h" namespace de { -template <ShardInterface ShardType, QueryInterface<ShardType> QueryType, - LayoutPolicy L> +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> struct ReconstructionArgs { typedef typename ShardType::RECORD RecordType; - Epoch<ShardType, QueryType, L> *epoch; - ReconstructionVector merges; + Epoch<ShardType, QueryType> *epoch; + ReconstructionVector tasks; std::promise<bool> result; bool compaction; void *extension; diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index c81ad05..9b7ae87 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -11,40 +11,23 @@ #include <atomic> #include <cstdio> +#include <memory> #include <vector> #include "framework/structure/BufferView.h" #include "framework/structure/InternalLevel.h" - -#include "framework/util/Configuration.h" - -#include "psu-util/timer.h" #include "util/types.h" namespace de { -template <ShardInterface ShardType, QueryInterface<ShardType> QueryType, - LayoutPolicy L = LayoutPolicy::TEIRING> +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> class ExtensionStructure { typedef typename ShardType::RECORD RecordType; typedef BufferView<RecordType> BuffView; - - typedef struct { - size_t reccnt; - size_t reccap; - - size_t shardcnt; - size_t shardcap; - } level_state; - - typedef std::vector<level_state> state_vector; - + typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> + LevelVector; public: - ExtensionStructure(size_t buffer_size, size_t scale_factor, - double max_delete_prop) - : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop), - m_buffer_size(buffer_size) {} - + ExtensionStructure() = default; ~ExtensionStructure() = default; /* @@ -62,16 +45,13 @@ public: * problems under tagging with concurrency. Any deletes in this context will * need to be forwarded to the appropriate structures manually. */ - ExtensionStructure<ShardType, QueryType, L> *copy() { - auto new_struct = new ExtensionStructure<ShardType, QueryType, L>( - m_buffer_size, m_scale_factor, m_max_delete_prop); + ExtensionStructure<ShardType, QueryType> *copy() const { + auto new_struct = new ExtensionStructure<ShardType, QueryType>(); for (size_t i = 0; i < m_levels.size(); i++) { new_struct->m_levels.push_back(m_levels[i]->clone()); } new_struct->m_refcnt = 0; - new_struct->m_current_state = m_current_state; - return new_struct; } @@ -100,30 +80,10 @@ public: } /* - * Flush a buffer into the extension structure, performing any necessary - * reconstructions to free up room in L0. - * - * FIXME: arguably, this should be a method attached to the buffer that - * takes a structure as input. - */ - inline bool flush_buffer(BuffView buffer) { - state_vector tmp = m_current_state; - - if (tmp.size() == 0) { - grow(tmp); - } - - assert(can_reconstruct_with(0, buffer.get_record_count(), tmp)); - flush_buffer_into_l0(std::move(buffer)); - - return true; - } - - /* * Return the total number of records (including tombstones) within all * of the levels of the structure. */ - size_t get_record_count() { + size_t get_record_count() const { size_t cnt = 0; for (size_t i = 0; i < m_levels.size(); i++) { @@ -138,7 +98,7 @@ public: * Return the total number of tombstones contained within all of the * levels of the structure. */ - size_t get_tombstone_count() { + size_t get_tombstone_count() const { size_t cnt = 0; for (size_t i = 0; i < m_levels.size(); i++) { @@ -153,13 +113,13 @@ public: * Return the number of levels within the structure. Note that not * all of these levels are necessarily populated. */ - size_t get_height() { return m_levels.size(); } + size_t get_height() const { return m_levels.size(); } /* * Return the amount of memory (in bytes) used by the shards within the * structure for storing the primary data structure and raw data. */ - size_t get_memory_usage() { + size_t get_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_levels.size(); i++) { if (m_levels[i]) @@ -174,7 +134,7 @@ public: * structure for storing auxiliary data structures. This total does not * include memory used for the main data structure, or raw data. */ - size_t get_aux_memory_usage() { + size_t get_aux_memory_usage() const { size_t cnt = 0; for (size_t i = 0; i < m_levels.size(); i++) { if (m_levels[i]) { @@ -185,326 +145,74 @@ public: return cnt; } - /* - * Validate that no level in the structure exceeds its maximum tombstone - * capacity. This is used to trigger preemptive compactions at the end of - * the reconstruction process. - */ - bool validate_tombstone_proportion() { - long double ts_prop; - for (size_t i = 0; i < m_levels.size(); i++) { - if (m_levels[i]) { - ts_prop = (long double)m_levels[i]->get_tombstone_count() / - (long double)calc_level_record_capacity(i); - if (ts_prop > (long double)m_max_delete_prop) { - return false; - } - } - } - - return true; - } - - bool validate_tombstone_proportion(level_index level) { - long double ts_prop = (long double)m_levels[level]->get_tombstone_count() / - (long double)calc_level_record_capacity(level); - return ts_prop <= (long double)m_max_delete_prop; - } - - /* - * Return a reference to the underlying vector of levels within the - * structure. - */ - std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> & - get_levels() { - return m_levels; - } - - /* - * NOTE: This cannot be simulated, because tombstone cancellation is not - * cheaply predictable. It is possible that the worst case number could - * be used instead, to allow for prediction, but compaction isn't a - * major concern outside of sampling; at least for now. So I'm not - * going to focus too much time on it at the moment. - */ - ReconstructionVector get_compaction_tasks() { - ReconstructionVector tasks; - state_vector scratch_state = m_current_state; - - /* if the tombstone/delete invariant is satisfied, no need for compactions - */ - if (validate_tombstone_proportion()) { - return tasks; - } - - /* locate the first level to violate the invariant */ - level_index violation_idx = -1; - for (level_index i = 0; i < m_levels.size(); i++) { - if (!validate_tombstone_proportion(i)) { - violation_idx = i; - break; - } - } - - assert(violation_idx != -1); - - level_index base_level = - find_reconstruction_target(violation_idx, scratch_state); - if (base_level == -1) { - base_level = grow(scratch_state); - } - - for (level_index i = base_level; i > 0; i--) { - /* - * The amount of storage required for the reconstruction accounts - * for the cost of storing the new records, along with the - * cost of retaining the old records during the process - * (hence the 2x multiplier). - * - * FIXME: currently does not account for the *actual* size - * of the shards, only the storage for the records - * themselves. - */ - size_t reccnt = m_levels[i - 1]->get_record_count(); - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, reccnt, scratch_state)) { - reccnt += m_levels[i]->get_record_count(); + inline void perform_reconstruction(ReconstructionTask task) { + /* perform the reconstruction itself */ + std::vector<ShardType *> shards; + for (ShardID shid : task.sources) { + assert(shid.level_idx < m_levels.size()); + assert(shid.shard_idx >= -1); + + /* if unspecified, push all shards into the vector */ + if (shid.shard_idx == all_shards_idx) { + for (size_t i = 0; i < m_levels[shid.level_idx].get_shard_count(); + i++) { + if (m_levels[shid.level_idx]->get_shard(i)) { + shards.push_back(m_levels[shid.level_idx]->get_shard(i)); + } } + } else { + shards.push_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx)); } - tasks.add_reconstruction(i - i, i, reccnt); } - return tasks; - } + auto new_shard = Shard(shards); - /* - * - */ - ReconstructionVector - get_reconstruction_tasks(size_t buffer_reccnt, - state_vector scratch_state = {}) { /* - * If no scratch state vector is provided, use a copy of the - * current one. The only time an empty vector could be used as - * *real* input to this function is when the current state is also - * empty, so this should would even in that case. + * Remove all of the shards processed by the operation */ - if (scratch_state.size() == 0) { - scratch_state = m_current_state; - } - - ReconstructionVector reconstructions; - size_t LOOKAHEAD = 1; - for (size_t i = 0; i < LOOKAHEAD; i++) { - /* - * If L0 cannot support a direct buffer flush, figure out what - * work must be done to free up space first. Otherwise, the - * reconstruction vector will be initially empty. - */ - if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) { - auto local_recon = - get_reconstruction_tasks_from_level(0, scratch_state); - - /* - * for the first iteration, we need to do all of the - * reconstructions, so use these to initially the returned - * reconstruction list - */ - if (i == 0) { - reconstructions = local_recon; - /* - * Quick sanity test of idea: if the next reconstruction - * would be larger than this one, steal the largest - * task from it and run it now instead. - */ - } else if (local_recon.get_total_reccnt() > - reconstructions.get_total_reccnt()) { - auto t = local_recon.remove_reconstruction(0); - reconstructions.add_reconstruction(t); - } - } - - /* simulate the buffer flush in the scratch state */ - scratch_state[0].reccnt += buffer_reccnt; - if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) { - scratch_state[0].shardcnt += 1; + for (ShardID shid : task.sources) { + if (shid.shard_idx == all_shards_idx) { + m_levels[shid.level_idx]->truncate(); + } else { + m_levels[shid.level_idx]->delete_shard(shid.shard_idx); } } - return reconstructions; - } - - /* - * - */ - ReconstructionVector - get_reconstruction_tasks_from_level(level_index source_level, - state_vector &scratch_state) { - ReconstructionVector reconstructions; - /* - * Find the first level capable of sustaining a reconstruction from - * the level above it. If no such level exists, add a new one at - * the bottom of the structure. + * Append the new shard to the target level */ - level_index base_level = - find_reconstruction_target(source_level, scratch_state); - if (base_level == -1) { - base_level = grow(scratch_state); - } - - if constexpr (L == LayoutPolicy::BSM) { - if (base_level == 0) { - return reconstructions; - } - - ReconstructionTask task; - task.target = base_level; - - size_t base_reccnt = 0; - for (level_index i = base_level; i > source_level; i--) { - auto recon_reccnt = scratch_state[i - 1].reccnt; - base_reccnt += recon_reccnt; - scratch_state[i - 1].reccnt = 0; - scratch_state[i - 1].shardcnt = 0; - task.add_source(i - 1, recon_reccnt); - } - - reconstructions.add_reconstruction(task); - scratch_state[base_level].reccnt = base_reccnt; - scratch_state[base_level].shardcnt = 1; - - return reconstructions; + if (task.target < m_levels.size()) { + m_levels[task.target]->append_shard(new_shard); + } else { + m_levels.push_back(); } + } + inline void perform_flush(ReconstructionTask task, BuffView buffer) { /* - * Determine the full set of reconstructions necessary to open up - * space in the source level. + * FIXME: this might be faster with a custom interface for merging + * the buffer and a vector of shards, but that would also complicate + * the shard interface a lot, so we'll leave it like this for now. It + * does mean that policies that merge the buffer into L0 double-process + * the buffer itself. Given that we're unlikely to actually use policies + * like that, we'll leave this as low priority. */ - for (level_index i = base_level; i > source_level; i--) { - size_t recon_reccnt = scratch_state[i - 1].reccnt; - size_t base_reccnt = recon_reccnt; - - /* - * If using Leveling, the total reconstruction size will be the - * records in *both* base and target, because they will need to - * be merged (assuming that target isn't empty). - */ - if constexpr (L == LayoutPolicy::LEVELING) { - if (can_reconstruct_with(i, base_reccnt, scratch_state)) { - recon_reccnt += scratch_state[i].reccnt; - } - } - reconstructions.add_reconstruction(i - 1, i, recon_reccnt); - - /* - * The base level will be emptied and its records moved to - * the target. - */ - scratch_state[i - 1].reccnt = 0; - scratch_state[i - 1].shardcnt = 0; - - /* - * The target level will have the records from the base level - * added to it, and potentially gain a shard if the LayoutPolicy - * is tiering or the level currently lacks any shards at all. - */ - scratch_state[i].reccnt += base_reccnt; - if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) { - scratch_state[i].shardcnt += 1; - } - } - - return reconstructions; - } - - inline void reconstruction(ReconstructionTask task) { - static_assert(L == LayoutPolicy::BSM); - std::vector<InternalLevel<ShardType, QueryType> *> levels( - task.sources.size()); - for (size_t i = 0; i < task.sources.size(); i++) { - levels[i] = m_levels[task.sources[i]].get(); - } - - auto new_level = InternalLevel<ShardType, QueryType>::reconstruction( - levels, task.target); - if (task.target >= (level_index) m_levels.size()) { - m_current_state.push_back({new_level->get_record_count(), - calc_level_record_capacity(task.target), 1, - 1}); - m_levels.emplace_back(new_level); + ShardType *buffer_shard = new ShardType(buffer); + if (task.type == ReconstructionType::Append) { + m_levels[0]->append(std::shared_ptr(buffer_shard)); } else { - m_current_state[task.target] = {new_level->get_record_count(), - calc_level_record_capacity(task.target), - 1, 1}; - m_levels[task.target] = new_level; - } - - /* remove all of the levels that have been flattened */ - for (size_t i = 0; i < task.sources.size(); i++) { - m_levels[task.sources[i]] = - std::shared_ptr<InternalLevel<ShardType, QueryType>>( - new InternalLevel<ShardType, QueryType>(task.sources[i], 1)); - m_current_state[task.sources[i]] = { - 0, calc_level_record_capacity(task.target), 0, 1}; - } - - return; - } - - /* - * Combine incoming_level with base_level and reconstruct the shard, - * placing it in base_level. The two levels should be sequential--i.e. no - * levels are skipped in the reconstruction process--otherwise the - * tombstone ordering invariant may be violated. - */ - inline void reconstruction(level_index base_level, - level_index incoming_level) { - size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - - if (base_level >= (level_index) m_levels.size()) { - m_levels.emplace_back( - std::shared_ptr<InternalLevel<ShardType, QueryType>>( - new InternalLevel<ShardType, QueryType>(base_level, - shard_capacity))); - m_current_state.push_back( - {0, calc_level_record_capacity(base_level), 0, shard_capacity}); - } + std::vector<ShardType *> shards; + for (size_t i = 0; i < m_levels[0].size(); i++) { + if (m_levels[0]->get_shard(i)) { + shards.push_back(m_levels[0]->get_shard(i)); + } - if constexpr (L == LayoutPolicy::LEVELING) { - /* if the base level has a shard, merge the base and incoming together to - * make a new one */ - if (m_levels[base_level]->get_shard_count() > 0) { - m_levels[base_level] = - InternalLevel<ShardType, QueryType>::reconstruction( - m_levels[base_level].get(), m_levels[incoming_level].get()); - /* otherwise, we can just move the incoming to the base */ - } else { - m_levels[base_level] = m_levels[incoming_level]; + shards.push_back(buffer_shard); + ShardType *new_shard = new ShardType(shards); + m_levels[0]->truncate(); + m_levels[0]->append(std::shared_ptr(new_shard)); } - - } else { - m_levels[base_level]->append_level(m_levels[incoming_level].get()); - m_levels[base_level]->finalize(); } - - /* place a new, empty level where the incoming level used to be */ - m_levels[incoming_level] = - std::shared_ptr<InternalLevel<ShardType, QueryType>>( - new InternalLevel<ShardType, QueryType>( - incoming_level, - (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor)); - - /* - * Update the state vector to match the *real* state following - * the reconstruction - */ - m_current_state[base_level] = {m_levels[base_level]->get_record_count(), - calc_level_record_capacity(base_level), - m_levels[base_level]->get_shard_count(), - shard_capacity}; - m_current_state[incoming_level] = { - 0, calc_level_record_capacity(incoming_level), 0, shard_capacity}; } bool take_reference() { @@ -518,11 +226,11 @@ public: return true; } - size_t get_reference_count() { return m_refcnt.load(); } + size_t get_reference_count() const { return m_refcnt.load(); } std::vector<typename QueryType::LocalQuery *> get_local_queries(std::vector<std::pair<ShardID, ShardType *>> &shards, - typename QueryType::Parameters *parms) { + typename QueryType::Parameters *parms) const { std::vector<typename QueryType::LocalQuery *> queries; @@ -533,150 +241,11 @@ public: return queries; } -private: - size_t m_scale_factor; - double m_max_delete_prop; - size_t m_buffer_size; + LevelVector const &get_level_vector() const { return m_levels; } +private: std::atomic<size_t> m_refcnt; - - std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> m_levels; - - /* - * A pair of <record_count, shard_count> for each level in the - * structure. Record counts may be slightly inaccurate due to - * deletes. - */ - state_vector m_current_state; - - /* - * Add a new level to the scratch state and return its index. - * - * IMPORTANT: This does _not_ add a level to the extension structure - * anymore. This is handled by the appropriate reconstruction and flush - * methods as needed. This function is for use in "simulated" - * reconstructions. - */ - inline level_index grow(state_vector &scratch_state) { - level_index new_idx = m_levels.size(); - size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - - scratch_state.push_back( - {0, calc_level_record_capacity(new_idx), 0, new_shard_cap}); - return new_idx; - } - - /* - * Find the first level below the level indicated by idx that - * is capable of sustaining a reconstruction and return its - * level index. If no such level exists, returns -1. Also - * returns -1 if idx==0, and no such level exists, to simplify - * the logic of the first buffer flush. - */ - inline level_index find_reconstruction_target(level_index idx, - state_vector &state) { - - /* - * this handles the very first buffer flush, when the state vector - * is empty. - */ - if (idx == 0 && state.size() == 0) - return -1; - - size_t incoming_rec_cnt = state[idx].reccnt; - for (level_index i = idx + 1; i < (level_index) state.size(); i++) { - if (can_reconstruct_with(i, incoming_rec_cnt, state)) { - return i; - } - - incoming_rec_cnt = state[idx].reccnt; - } - - return -1; - } - - inline void flush_buffer_into_l0(BuffView buffer) { - size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor; - - if (m_levels.size() == 0) { - m_levels.emplace_back( - std::shared_ptr<InternalLevel<ShardType, QueryType>>( - new InternalLevel<ShardType, QueryType>(0, shard_capacity))); - - m_current_state.push_back( - {0, calc_level_record_capacity(0), 0, shard_capacity}); - } - - if constexpr (L == LayoutPolicy::LEVELING) { - // FIXME: Kludgey implementation due to interface constraints. - auto old_level = m_levels[0].get(); - auto temp_level = new InternalLevel<ShardType, QueryType>(0, 1); - temp_level->append_buffer(std::move(buffer)); - - if (old_level->get_shard_count() > 0) { - m_levels[0] = InternalLevel<ShardType, QueryType>::reconstruction( - old_level, temp_level); - delete temp_level; - } else { - m_levels[0] = - std::shared_ptr<InternalLevel<ShardType, QueryType>>(temp_level); - } - } else { - m_levels[0]->append_buffer(std::move(buffer)); - } - - /* update the state vector */ - m_current_state[0].reccnt = m_levels[0]->get_record_count(); - m_current_state[0].shardcnt = m_levels[0]->get_shard_count(); - } - - /* - * Mark a given memory level as no-longer in use by the tree. For now this - * will just free the level. In future, this will be more complex as the - * level may not be able to immediately be deleted, depending upon who - * else is using it. - */ - inline void - mark_as_unused(std::shared_ptr<InternalLevel<ShardType, QueryType>> level) { - level.reset(); - } - - /* - * Assume that level "0" should be larger than the buffer. The buffer - * itself is index -1, which should return simply the buffer capacity. - */ - inline size_t calc_level_record_capacity(level_index idx) { - return m_buffer_size * pow(m_scale_factor, idx + 1); - } - - /* - * Returns the number of records present on a specified level. - */ - inline size_t get_level_record_count(level_index idx) { - return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; - } - - /* - * Determines if a level can sustain a reconstruction with incoming_rec_cnt - * additional records without exceeding its capacity. - */ - inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, - state_vector &state) { - if (idx >= (level_index) state.size()) { - return false; - } - - if constexpr (L == LayoutPolicy::LEVELING) { - return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap; - } else if constexpr (L == LayoutPolicy::BSM) { - return state[idx].reccnt == 0; - } else { - return state[idx].shardcnt < state[idx].shardcap; - } - - /* unreachable */ - assert(true); - } + LevelVector m_levels; }; } // namespace de diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index a4cf94d..8cfcd49 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -19,14 +19,10 @@ #include <vector> #include "framework/interface/Query.h" -#include "framework/interface/Record.h" #include "framework/interface/Shard.h" -#include "framework/structure/BufferView.h" #include "util/types.h" namespace de { -template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> -class InternalLevel; template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> class InternalLevel { @@ -34,110 +30,9 @@ class InternalLevel { typedef BufferView<RecordType> BuffView; public: - InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no), m_shard_cnt(0), m_shards(shard_cap, nullptr), - m_pending_shard(nullptr) {} - - ~InternalLevel() { delete m_pending_shard; } - - /* - * Create a new shard combining the records from base_level and new_level, - * and return a shared_ptr to a new level containing this shard. This is used - * for reconstructions under the leveling layout policy. - * - * No changes are made to the levels provided as arguments. - */ - static std::shared_ptr<InternalLevel> - reconstruction(InternalLevel *base_level, InternalLevel *new_level) { - assert(base_level->m_level_no > new_level->m_level_no || - (base_level->m_level_no == 0 && new_level->m_level_no == 0)); - auto res = new InternalLevel(base_level->m_level_no, 1); - res->m_shard_cnt = 1; - std::vector<ShardType *> shards = {base_level->m_shards[0].get(), - new_level->m_shards[0].get()}; - - res->m_shards[0] = std::make_shared<ShardType>(shards); - return std::shared_ptr<InternalLevel>(res); - } - - static std::shared_ptr<InternalLevel> - reconstruction(std::vector<InternalLevel *> levels, size_t level_idx) { - std::vector<ShardType *> shards; - for (auto level : levels) { - for (auto shard : level->m_shards) { - if (shard) - shards.emplace_back(shard.get()); - } - } - - auto res = new InternalLevel(level_idx, 1); - res->m_shard_cnt = 1; - res->m_shards[0] = std::make_shared<ShardType>(shards); + InternalLevel(ssize_t level_no) : m_level_no(level_no) {} - return std::shared_ptr<InternalLevel>(res); - } - - /* - * Create a new shard combining the records from all of - * the shards in level, and append this new shard into - * this level. This is used for reconstructions under - * the tiering layout policy. - * - * No changes are made to the level provided as an argument. - */ - void append_level(InternalLevel *level) { - // FIXME: that this is happening probably means that - // something is going terribly wrong earlier in the - // reconstruction logic. - if (level->get_shard_count() == 0) { - return; - } - - std::vector<ShardType *> shards; - for (auto shard : level->m_shards) { - if (shard) - shards.emplace_back(shard.get()); - } - - if (m_shard_cnt == m_shards.size()) { - m_pending_shard = new ShardType(shards); - return; - } - - auto tmp = new ShardType(shards); - m_shards[m_shard_cnt] = std::shared_ptr<ShardType>(tmp); - - ++m_shard_cnt; - } - - /* - * Create a new shard using the records in the - * provided buffer, and append this new shard - * into this level. This is used for buffer - * flushes under the tiering layout policy. - */ - void append_buffer(BuffView buffer) { - if (m_shard_cnt == m_shards.size()) { - assert(m_pending_shard == nullptr); - m_pending_shard = new ShardType(std::move(buffer)); - return; - } - - m_shards[m_shard_cnt] = std::make_shared<ShardType>(std::move(buffer)); - ++m_shard_cnt; - } - - void finalize() { - if (m_pending_shard) { - for (size_t i = 0; i < m_shards.size(); i++) { - m_shards[i] = nullptr; - } - - m_shards[0] = std::shared_ptr<ShardType>(m_pending_shard); - m_pending_shard = nullptr; - m_shard_cnt = 1; - } - } + ~InternalLevel() = default; /* * Create a new shard containing the combined records @@ -146,7 +41,7 @@ public: * No changes are made to this level. */ ShardType *get_combined_shard() { - if (m_shard_cnt == 0) { + if (m_shards.size() == 0) { return nullptr; } @@ -163,7 +58,7 @@ public: std::vector<std::pair<ShardID, ShardType *>> &shards, std::vector<typename QueryType::LocalQuery *> &local_queries, typename QueryType::Parameters *query_parms) { - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { auto local_query = QueryType::local_preproc(m_shards[i].get(), query_parms); @@ -174,10 +69,10 @@ public: } bool check_tombstone(size_t shard_stop, const RecordType &rec) { - if (m_shard_cnt == 0) + if (m_shards.size() == 0) return false; - for (int i = m_shard_cnt - 1; i >= (ssize_t)shard_stop; i--) { + for (int i = m_shards.size() - 1; i >= (ssize_t)shard_stop; i--) { if (m_shards[i]) { auto res = m_shards[i]->point_lookup(rec, true); if (res && res->is_tombstone()) { @@ -189,7 +84,7 @@ public: } bool delete_record(const RecordType &rec) { - if (m_shard_cnt == 0) + if (m_shards.size() == 0) return false; for (size_t i = 0; i < m_shards.size(); ++i) { @@ -206,18 +101,18 @@ public: } ShardType *get_shard(size_t idx) { - if (idx >= m_shard_cnt) { + if (idx >= m_shards.size()) { return nullptr; } return m_shards[idx].get(); } - size_t get_shard_count() { return m_shard_cnt; } + size_t get_shard_count() { return m_shards.size(); } size_t get_record_count() { size_t cnt = 0; - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { cnt += m_shards[i]->get_record_count(); } @@ -228,7 +123,7 @@ public: size_t get_tombstone_count() { size_t res = 0; - for (size_t i = 0; i < m_shard_cnt; ++i) { + for (size_t i = 0; i < m_shards.size(); ++i) { if (m_shards[i]) { res += m_shards[i]->get_tombstone_count(); } @@ -238,7 +133,7 @@ public: size_t get_aux_memory_usage() { size_t cnt = 0; - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { cnt += m_shards[i]->get_aux_memory_usage(); } @@ -249,7 +144,7 @@ public: size_t get_memory_usage() { size_t cnt = 0; - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { cnt += m_shards[i]->get_memory_usage(); } @@ -261,7 +156,7 @@ public: double get_tombstone_prop() { size_t tscnt = 0; size_t reccnt = 0; - for (size_t i = 0; i < m_shard_cnt; i++) { + for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i]) { tscnt += m_shards[i]->get_tombstone_count(); reccnt += m_shards[i]->get_record_count(); @@ -272,24 +167,27 @@ public: } std::shared_ptr<InternalLevel> clone() { - auto new_level = - std::make_shared<InternalLevel>(m_level_no, m_shards.size()); - for (size_t i = 0; i < m_shard_cnt; i++) { + auto new_level = std::make_shared<InternalLevel>(m_level_no); + for (size_t i = 0; i < m_shards.size(); i++) { new_level->m_shards[i] = m_shards[i]; } - new_level->m_shard_cnt = m_shard_cnt; return new_level; } -private: - ssize_t m_level_no; + void truncate() { m_shards.erase(m_shards.begin(), m_shards.end()); } - size_t m_shard_cnt; - size_t m_shard_size_cap; + void delete_shard(shard_index shard) { + m_shards.erase(m_shards.begin() + shard); + } + + bool append(std::shared_ptr<ShardType> shard) { + m_shards.emplace_back(shard); + } +private: + ssize_t m_level_no; std::vector<std::shared_ptr<ShardType>> m_shards; - ShardType *m_pending_shard; }; } // namespace de diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index f4b0364..0477095 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -8,13 +8,21 @@ */ #pragma once +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "util/types.h" +#include "framework/interface/Scheduler.h" #include <cstdlib> -#include <utility> namespace de { -enum class LayoutPolicy { LEVELING, TEIRING, BSM }; +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType, +DeletePolicy D, SchedulerInterface SchedType> +class DEConfiguration { + DEConfiguration(std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy) + : m_recon_policy(recon_policy) {} -enum class DeletePolicy { TOMBSTONE, TAGGING }; + public: + std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> m_recon_policy; +}; } // namespace de |