diff options
| author | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-12-22 13:00:19 -0500 |
|---|---|---|
| committer | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-12-22 13:00:19 -0500 |
| commit | 5617bed5257506d3dfda8537b16f44b3e40f1b42 (patch) | |
| tree | b1a4bb957929b20c884a4eed070f42065828fdb6 /include/framework/DynamicExtension.h | |
| parent | 9876d74e503df64eb9e82e540ca41fcf593ebf64 (diff) | |
| download | dynamic-extension-5617bed5257506d3dfda8537b16f44b3e40f1b42.tar.gz | |
Began overhauling reconstruction mechanism
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 121 |
1 files changed, 50 insertions, 71 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 719232e..1886234 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -14,39 +14,36 @@ #include <vector> #include "framework/interface/Scheduler.h" +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/scheduling/FIFOScheduler.h" #include "framework/scheduling/SerialScheduler.h" #include "framework/structure/ExtensionStructure.h" #include "framework/structure/MutableBuffer.h" #include "framework/scheduling/Epoch.h" -#include "framework/util/Configuration.h" +#include "util/types.h" namespace de { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType, - LayoutPolicy L = LayoutPolicy::TEIRING, DeletePolicy D = DeletePolicy::TAGGING, - SchedulerInterface SchedType = SerialScheduler> + SchedulerInterface SchedType = de::FIFOScheduler> class DynamicExtension { - /* for unit testing purposes */ -public: - LayoutPolicy Layout = L; - private: /* convenience typedefs for commonly used types within the class */ typedef typename ShardType::RECORD RecordType; typedef MutableBuffer<RecordType> Buffer; - typedef ExtensionStructure<ShardType, QueryType, L> Structure; - typedef Epoch<ShardType, QueryType, L> _Epoch; + typedef ExtensionStructure<ShardType, QueryType> Structure; + typedef Epoch<ShardType, QueryType> _Epoch; typedef BufferView<RecordType> BufView; + typedef ReconstructionPolicy<ShardType, QueryType> ReconPolicyType; typedef typename QueryType::Parameters Parameters; typedef typename QueryType::LocalQuery LocalQuery; typedef typename QueryType::LocalQueryBuffer BufferQuery; typedef typename QueryType::LocalResultType LocalResult; typedef typename QueryType::ResultType QueryResult; - static constexpr size_t QUERY = 1; static constexpr size_t RECONSTRUCTION = 2; @@ -61,36 +58,32 @@ public: * Create a new Dynamized version of a data structure, supporting * inserts and, possibly, deletes. The following parameters are used * for configuration of the structure, - * @param buffer_low_watermark The number of records that can be + * @param buffer_low_watermark The number of records that can be * inserted before a buffer flush is initiated * - * @param buffer_high_watermark The maximum buffer capacity, inserts - * will begin to fail once this number is reached, until the - * buffer flush has completed. Has no effect in single-threaded + * @param buffer_high_watermark The maximum buffer capacity, inserts + * will begin to fail once this number is reached, until the + * buffer flush has completed. Has no effect in single-threaded * operation * - * @param scale_factor The rate at which the capacity of levels + * @param scale_factor The rate at which the capacity of levels * grows; should be at least 2 for reasonable performance * * @param memory_budget Unused at this time * * @param thread_cnt The maximum number of threads available to the - * framework's scheduler for use in answering queries and + * framework's scheduler for use in answering queries and * performing compactions and flushes, etc. */ - DynamicExtension(size_t buffer_low_watermark, size_t buffer_high_watermark, - size_t scale_factor, size_t memory_budget = 0, + DynamicExtension(ReconPolicyType *recon_policy, size_t buffer_low_watermark, + size_t buffer_high_watermark, size_t memory_budget = 0, size_t thread_cnt = 16) - : m_scale_factor(scale_factor), m_max_delete_prop(1), - m_sched(memory_budget, thread_cnt), + : m_max_delete_prop(1), m_sched(memory_budget, thread_cnt), m_buffer(new Buffer(buffer_low_watermark, buffer_high_watermark)), - m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0) { - if constexpr (L == LayoutPolicy::BSM) { - assert(scale_factor == 2); - } + m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0), + m_recon_policy(recon_policy) { - auto vers = - new Structure(buffer_high_watermark, m_scale_factor, m_max_delete_prop); + auto vers = new Structure(); m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); m_previous_epoch.store({nullptr, 0}); m_next_epoch.store({nullptr, 0}); @@ -133,26 +126,26 @@ public: int insert(const RecordType &rec) { return internal_append(rec, false); } /** - * Erases a record from the index, according to the DeletePolicy + * Erases a record from the index, according to the DeletePolicy * template parameter. Returns 1 on success and 0 on failure. The * equality comparison operator of RecordType is used to identify - * the record to be deleted. - * + * the record to be deleted. + * * Deletes behave differently, depending on the DeletionPolicy. For * Tombstone deletes, a tombstone record will be inserted into the * index. The presence of the deleted record is not checked first, so * deleting a record that does not exist will result in an unnecessary - * tombstone record being written. + * tombstone record being written. * * Deletes using Tagging will perform a point lookup for the record to - * be removed, and mark it as deleted in its header. + * be removed, and mark it as deleted in its header. * - * @param rec The record to be deleted. The argument to this function + * @param rec The record to be deleted. The argument to this function * should compare equal to the record to be deleted. * - * @return 1 on success, and 0 on failure. For tombstone deletes, a - * failure will occur if the insert fails due to the buffer - * being full, and can be retried. For tagging deletes, a + * @return 1 on success, and 0 on failure. For tombstone deletes, a + * failure will occur if the insert fails due to the buffer + * being full, and can be retried. For tagging deletes, a * failure means that hte record to be deleted could not be * found in the index, and should *not* be retried. */ @@ -202,13 +195,12 @@ public: * @return A future, from which the query results can be retrieved upon * query completion */ - std::future<QueryResult> - query(Parameters &&parms) { + std::future<QueryResult> query(Parameters &&parms) { return schedule_query(std::move(parms)); } /** - * Determine the number of records (including tagged records and + * Determine the number of records (including tagged records and * tombstones) currently within the framework. This number is used for * determining when and how reconstructions occur. * @@ -229,7 +221,7 @@ public: * but will always return 0 in that case. * * @return The number of tombstone records within the index - */ + */ size_t get_tombstone_count() { auto epoch = get_active_epoch(); auto t = epoch->get_buffer().get_tombstone_count() + @@ -246,7 +238,7 @@ public: * as each level can contain multiple shards in that case. * * @return The number of levels within the index - */ + */ size_t get_height() { auto epoch = get_active_epoch(); auto t = epoch->get_structure()->get_height(); @@ -292,7 +284,7 @@ public: /** * Create a new single Shard object containing all of the records - * within the framework (buffer and shards). + * within the framework (buffer and shards). * * @param await_reconstruction_completion Specifies whether the currently * active state of the index should be used to create the shard @@ -357,11 +349,11 @@ public: } /** - * Verify that the currently active version of the index does not + * Verify that the currently active version of the index does not * violate tombstone proportion invariants. Exposed for unit-testing * purposes. * - * @return Returns true if the tombstone proportion invariant is + * @return Returns true if the tombstone proportion invariant is * satisfied, and false if it is not. */ bool validate_tombstone_proportion() { @@ -378,7 +370,7 @@ public: void print_scheduler_statistics() const { m_sched.print_statistics(); } private: - size_t m_scale_factor; + ReconPolicyType const *m_recon_policy; double m_max_delete_prop; SchedType m_sched; @@ -387,7 +379,7 @@ private: size_t m_core_cnt; std::atomic<int> m_next_core; std::atomic<size_t> m_epoch_cnt; - + alignas(64) std::atomic<bool> m_reconstruction_scheduled; std::atomic<epoch_ptr> m_next_epoch; @@ -397,9 +389,6 @@ private: std::condition_variable m_epoch_cv; std::mutex m_epoch_cv_lk; - - - void enforce_delete_invariant(_Epoch *epoch) { auto structure = epoch->get_structure(); auto compactions = structure->get_compaction_tasks(); @@ -407,8 +396,8 @@ private: while (compactions.size() > 0) { /* schedule a compaction */ - ReconstructionArgs<ShardType, QueryType, L> *args = - new ReconstructionArgs<ShardType, QueryType, L>(); + ReconstructionArgs<ShardType, QueryType> *args = + new ReconstructionArgs<ShardType, QueryType>(); args->epoch = epoch; args->merges = compactions; args->extension = this; @@ -555,20 +544,13 @@ private: } static void reconstruction(void *arguments) { - auto args = (ReconstructionArgs<ShardType, QueryType, L> *)arguments; + auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments; ((DynamicExtension *)args->extension)->SetThreadAffinity(); Structure *vers = args->epoch->get_structure(); - if constexpr (L == LayoutPolicy::BSM) { - if (args->merges.size() > 0) { - vers->reconstruction(args->merges[0]); - } - } else { - for (size_t i = 0; i < args->merges.size(); i++) { - vers->reconstruction(args->merges[i].target, - args->merges[i].sources[0]); - } + for (size_t i=0; i<args->tasks.size(); i++) { + vers->perform_reconstruction(args->tasks[i]); } /* @@ -607,8 +589,7 @@ private: } static void async_query(void *arguments) { - auto *args = - (QueryArgs<ShardType, QueryType, DynamicExtension> *) arguments; + auto *args = (QueryArgs<ShardType, QueryType, DynamicExtension> *)arguments; auto epoch = args->extension->get_active_epoch(); @@ -636,7 +617,7 @@ private: query_results[i] = QueryType::local_query_buffer(buffer_query); } else { /*execute local queries */ query_results[i] = QueryType::local_query(shards[i - 1].second, - local_queries[i - 1]); + local_queries[i - 1]); } /* end query early if EARLY_ABORT is set and a result exists */ @@ -673,11 +654,11 @@ private: void schedule_reconstruction() { auto epoch = create_new_epoch(); - ReconstructionArgs<ShardType, QueryType, L> *args = - new ReconstructionArgs<ShardType, QueryType, L>(); + ReconstructionArgs<ShardType, QueryType> *args = + new ReconstructionArgs<ShardType, QueryType>(); args->epoch = epoch; - args->merges = epoch->get_structure()->get_reconstruction_tasks( - m_buffer->get_high_watermark()); + args->tasks = m_recon_policy->get_reconstruction_tasks( + epoch, m_buffer->get_high_watermark()); args->extension = this; args->compaction = false; /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed @@ -686,10 +667,8 @@ private: m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); } - std::future<QueryResult> - schedule_query(Parameters &&query_parms) { - auto args = - new QueryArgs<ShardType, QueryType, DynamicExtension>(); + std::future<QueryResult> schedule_query(Parameters &&query_parms) { + auto args = new QueryArgs<ShardType, QueryType, DynamicExtension>(); args->extension = this; args->query_parms = std::move(query_parms); auto result = args->result_set.get_future(); |